directory structure change to work with glide

glide has its own requirements. My previous workaround caused me some
code checkin errors. Need to fix this.
This commit is contained in:
Chris Lu
2016-06-02 18:09:14 -07:00
parent caeffa3998
commit 5ce6bbf076
166 changed files with 296 additions and 272 deletions

View File

@@ -0,0 +1,35 @@
package topology
import (
"encoding/json"
"errors"
"fmt"
"net/url"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
)
type AllocateVolumeResult struct {
Error string
}
func AllocateVolume(dn *DataNode, vid storage.VolumeId, option *VolumeGrowOption) error {
values := make(url.Values)
values.Add("volume", vid.String())
values.Add("collection", option.Collection)
values.Add("replication", option.ReplicaPlacement.String())
values.Add("ttl", option.Ttl.String())
jsonBlob, err := util.Post("http://"+dn.Url()+"/admin/assign_volume", values)
if err != nil {
return err
}
var ret AllocateVolumeResult
if err := json.Unmarshal(jsonBlob, &ret); err != nil {
return fmt.Errorf("Invalid JSON result for %s: %s", "/admin/assign_volum", string(jsonBlob))
}
if ret.Error != "" {
return errors.New(ret.Error)
}
return nil
}

View File

@@ -0,0 +1,31 @@
package topology
import (
"github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
)
type MaxVolumeIdCommand struct {
MaxVolumeId storage.VolumeId `json:"maxVolumeId"`
}
func NewMaxVolumeIdCommand(value storage.VolumeId) *MaxVolumeIdCommand {
return &MaxVolumeIdCommand{
MaxVolumeId: value,
}
}
func (c *MaxVolumeIdCommand) CommandName() string {
return "MaxVolumeId"
}
func (c *MaxVolumeIdCommand) Apply(server raft.Server) (interface{}, error) {
topo := server.Context().(*Topology)
before := topo.GetMaxVolumeId()
topo.UpAdjustMaxVolumeId(c.MaxVolumeId)
glog.V(4).Infoln("max volume id", before, "==>", topo.GetMaxVolumeId())
return nil, nil
}

View File

@@ -0,0 +1,57 @@
package topology
import (
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
)
type Collection struct {
Name string
volumeSizeLimit uint64
storageType2VolumeLayout *util.ConcurrentReadMap
}
func NewCollection(name string, volumeSizeLimit uint64) *Collection {
c := &Collection{Name: name, volumeSizeLimit: volumeSizeLimit}
c.storageType2VolumeLayout = util.NewConcurrentReadMap()
return c
}
func (c *Collection) String() string {
return fmt.Sprintf("Name:%s, volumeSizeLimit:%d, storageType2VolumeLayout:%v", c.Name, c.volumeSizeLimit, c.storageType2VolumeLayout)
}
func (c *Collection) GetOrCreateVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {
keyString := rp.String()
if ttl != nil {
keyString += ttl.String()
}
vl := c.storageType2VolumeLayout.Get(keyString, func() interface{} {
return NewVolumeLayout(rp, ttl, c.volumeSizeLimit)
})
return vl.(*VolumeLayout)
}
func (c *Collection) Lookup(vid storage.VolumeId) []*DataNode {
for _, vl := range c.storageType2VolumeLayout.Items() {
if vl != nil {
if list := vl.(*VolumeLayout).Lookup(vid); list != nil {
return list
}
}
}
return nil
}
func (c *Collection) ListVolumeServers() (nodes []*DataNode) {
for _, vl := range c.storageType2VolumeLayout.Items() {
if vl != nil {
if list := vl.(*VolumeLayout).ListVolumeServers(); list != nil {
nodes = append(nodes, list...)
}
}
}
return
}

View File

@@ -0,0 +1,65 @@
package topology
import (
"encoding/xml"
)
type loc struct {
dcName string
rackName string
}
type rack struct {
Name string `xml:"name,attr"`
Ips []string `xml:"Ip"`
}
type dataCenter struct {
Name string `xml:"name,attr"`
Racks []rack `xml:"Rack"`
}
type topology struct {
DataCenters []dataCenter `xml:"DataCenter"`
}
type Configuration struct {
XMLName xml.Name `xml:"Configuration"`
Topo topology `xml:"Topology"`
ip2location map[string]loc
}
func NewConfiguration(b []byte) (*Configuration, error) {
c := &Configuration{}
err := xml.Unmarshal(b, c)
c.ip2location = make(map[string]loc)
for _, dc := range c.Topo.DataCenters {
for _, rack := range dc.Racks {
for _, ip := range rack.Ips {
c.ip2location[ip] = loc{dcName: dc.Name, rackName: rack.Name}
}
}
}
return c, err
}
func (c *Configuration) String() string {
if b, e := xml.MarshalIndent(c, " ", " "); e == nil {
return string(b)
}
return ""
}
func (c *Configuration) Locate(ip string, dcName string, rackName string) (dc string, rack string) {
if c != nil && c.ip2location != nil {
if loc, ok := c.ip2location[ip]; ok {
return loc.dcName, loc.rackName
}
}
if dcName == "" {
dcName = "DefaultDataCenter"
}
if rackName == "" {
rackName = "DefaultRack"
}
return dcName, rackName
}

View File

@@ -0,0 +1,42 @@
package topology
import (
"fmt"
"testing"
)
func TestLoadConfiguration(t *testing.T) {
confContent := `
<?xml version="1.0" encoding="UTF-8" ?>
<Configuration>
<Topology>
<DataCenter name="dc1">
<Rack name="rack1">
<Ip>192.168.1.1</Ip>
</Rack>
</DataCenter>
<DataCenter name="dc2">
<Rack name="rack1">
<Ip>192.168.1.2</Ip>
</Rack>
<Rack name="rack2">
<Ip>192.168.1.3</Ip>
<Ip>192.168.1.4</Ip>
</Rack>
</DataCenter>
</Topology>
</Configuration>
`
c, err := NewConfiguration([]byte(confContent))
fmt.Printf("%s\n", c)
if err != nil {
t.Fatalf("unmarshal error:%v", err)
}
if len(c.Topo.DataCenters) <= 0 || c.Topo.DataCenters[0].Name != "dc1" {
t.Fatalf("unmarshal error:%s", c)
}
}

View File

@@ -0,0 +1,40 @@
package topology
type DataCenter struct {
NodeImpl
}
func NewDataCenter(id string) *DataCenter {
dc := &DataCenter{}
dc.id = NodeId(id)
dc.nodeType = "DataCenter"
dc.children = make(map[NodeId]Node)
dc.NodeImpl.value = dc
return dc
}
func (dc *DataCenter) GetOrCreateRack(rackName string) *Rack {
for _, c := range dc.Children() {
rack := c.(*Rack)
if string(rack.Id()) == rackName {
return rack
}
}
rack := NewRack(rackName)
dc.LinkChildNode(rack)
return rack
}
func (dc *DataCenter) ToMap() interface{} {
m := make(map[string]interface{})
m["Id"] = dc.Id()
m["Max"] = dc.GetMaxVolumeCount()
m["Free"] = dc.FreeSpace()
var racks []interface{}
for _, c := range dc.Children() {
rack := c.(*Rack)
racks = append(racks, rack.ToMap())
}
m["Racks"] = racks
return m
}

115
weed/topology/data_node.go Normal file
View File

@@ -0,0 +1,115 @@
package topology
import (
"fmt"
"strconv"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
)
type DataNode struct {
NodeImpl
volumes map[storage.VolumeId]storage.VolumeInfo
Ip string
Port int
PublicUrl string
LastSeen int64 // unix time in seconds
Dead bool
}
func NewDataNode(id string) *DataNode {
s := &DataNode{}
s.id = NodeId(id)
s.nodeType = "DataNode"
s.volumes = make(map[storage.VolumeId]storage.VolumeInfo)
s.NodeImpl.value = s
return s
}
func (dn *DataNode) String() string {
dn.RLock()
defer dn.RUnlock()
return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.Dead)
}
func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
dn.Lock()
defer dn.Unlock()
if _, ok := dn.volumes[v.Id]; !ok {
dn.volumes[v.Id] = v
dn.UpAdjustVolumeCountDelta(1)
if !v.ReadOnly {
dn.UpAdjustActiveVolumeCountDelta(1)
}
dn.UpAdjustMaxVolumeId(v.Id)
} else {
dn.volumes[v.Id] = v
}
}
func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVolumes []storage.VolumeInfo) {
actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo)
for _, v := range actualVolumes {
actualVolumeMap[v.Id] = v
}
dn.RLock()
for vid, v := range dn.volumes {
if _, ok := actualVolumeMap[vid]; !ok {
glog.V(0).Infoln("Deleting volume id:", vid)
delete(dn.volumes, vid)
deletedVolumes = append(deletedVolumes, v)
dn.UpAdjustVolumeCountDelta(-1)
dn.UpAdjustActiveVolumeCountDelta(-1)
}
} //TODO: adjust max volume id, if need to reclaim volume ids
dn.RUnlock()
for _, v := range actualVolumes {
dn.AddOrUpdateVolume(v)
}
return
}
func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) {
dn.RLock()
for _, v := range dn.volumes {
ret = append(ret, v)
}
dn.RUnlock()
return ret
}
func (dn *DataNode) GetDataCenter() *DataCenter {
return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter)
}
func (dn *DataNode) GetRack() *Rack {
return dn.Parent().(*NodeImpl).value.(*Rack)
}
func (dn *DataNode) GetTopology() *Topology {
p := dn.Parent()
for p.Parent() != nil {
p = p.Parent()
}
t := p.(*Topology)
return t
}
func (dn *DataNode) MatchLocation(ip string, port int) bool {
return dn.Ip == ip && dn.Port == port
}
func (dn *DataNode) Url() string {
return dn.Ip + ":" + strconv.Itoa(dn.Port)
}
func (dn *DataNode) ToMap() interface{} {
ret := make(map[string]interface{})
ret["Url"] = dn.Url()
ret["Volumes"] = dn.GetVolumeCount()
ret["Max"] = dn.GetMaxVolumeCount()
ret["Free"] = dn.FreeSpace()
ret["PublicUrl"] = dn.PublicUrl
return ret
}

272
weed/topology/node.go Normal file
View File

@@ -0,0 +1,272 @@
package topology
import (
"errors"
"math/rand"
"strings"
"sync"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
)
type NodeId string
type Node interface {
Id() NodeId
String() string
FreeSpace() int
ReserveOneVolume(r int) (*DataNode, error)
UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int)
UpAdjustVolumeCountDelta(volumeCountDelta int)
UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int)
UpAdjustMaxVolumeId(vid storage.VolumeId)
GetVolumeCount() int
GetActiveVolumeCount() int
GetMaxVolumeCount() int
GetMaxVolumeId() storage.VolumeId
SetParent(Node)
LinkChildNode(node Node)
UnlinkChildNode(nodeId NodeId)
CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64)
IsDataNode() bool
IsRack() bool
IsDataCenter() bool
Children() []Node
Parent() Node
GetValue() interface{} //get reference to the topology,dc,rack,datanode
}
type NodeImpl struct {
id NodeId
volumeCount int
activeVolumeCount int
maxVolumeCount int
parent Node
sync.RWMutex // lock children
children map[NodeId]Node
maxVolumeId storage.VolumeId
//for rack, data center, topology
nodeType string
value interface{}
}
// the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot
func (n *NodeImpl) RandomlyPickNodes(numberOfNodes int, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) {
candidates := make([]Node, 0, len(n.children))
var errs []string
n.RLock()
for _, node := range n.children {
if err := filterFirstNodeFn(node); err == nil {
candidates = append(candidates, node)
} else {
errs = append(errs, string(node.Id())+":"+err.Error())
}
}
n.RUnlock()
if len(candidates) == 0 {
return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n"))
}
firstNode = candidates[rand.Intn(len(candidates))]
glog.V(2).Infoln(n.Id(), "picked main node:", firstNode.Id())
restNodes = make([]Node, numberOfNodes-1)
candidates = candidates[:0]
n.RLock()
for _, node := range n.children {
if node.Id() == firstNode.Id() {
continue
}
if node.FreeSpace() <= 0 {
continue
}
glog.V(2).Infoln("select rest node candidate:", node.Id())
candidates = append(candidates, node)
}
n.RUnlock()
glog.V(2).Infoln(n.Id(), "picking", numberOfNodes-1, "from rest", len(candidates), "node candidates")
ret := len(restNodes) == 0
for k, node := range candidates {
if k < len(restNodes) {
restNodes[k] = node
if k == len(restNodes)-1 {
ret = true
}
} else {
r := rand.Intn(k + 1)
if r < len(restNodes) {
restNodes[r] = node
}
}
}
if !ret {
glog.V(2).Infoln(n.Id(), "failed to pick", numberOfNodes-1, "from rest", len(candidates), "node candidates")
err = errors.New("Not enough data node found!")
}
return
}
func (n *NodeImpl) IsDataNode() bool {
return n.nodeType == "DataNode"
}
func (n *NodeImpl) IsRack() bool {
return n.nodeType == "Rack"
}
func (n *NodeImpl) IsDataCenter() bool {
return n.nodeType == "DataCenter"
}
func (n *NodeImpl) String() string {
if n.parent != nil {
return n.parent.String() + ":" + string(n.id)
}
return string(n.id)
}
func (n *NodeImpl) Id() NodeId {
return n.id
}
func (n *NodeImpl) FreeSpace() int {
return n.maxVolumeCount - n.volumeCount
}
func (n *NodeImpl) SetParent(node Node) {
n.parent = node
}
func (n *NodeImpl) Children() (ret []Node) {
n.RLock()
defer n.RUnlock()
for _, c := range n.children {
ret = append(ret, c)
}
return ret
}
func (n *NodeImpl) Parent() Node {
return n.parent
}
func (n *NodeImpl) GetValue() interface{} {
return n.value
}
func (n *NodeImpl) ReserveOneVolume(r int) (assignedNode *DataNode, err error) {
n.RLock()
defer n.RUnlock()
for _, node := range n.children {
freeSpace := node.FreeSpace()
// fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
if freeSpace <= 0 {
continue
}
if r >= freeSpace {
r -= freeSpace
} else {
if node.IsDataNode() && node.FreeSpace() > 0 {
// fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
return node.(*DataNode), nil
}
assignedNode, err = node.ReserveOneVolume(r)
if err != nil {
return
}
}
}
return
}
func (n *NodeImpl) UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta int) { //can be negative
n.maxVolumeCount += maxVolumeCountDelta
if n.parent != nil {
n.parent.UpAdjustMaxVolumeCountDelta(maxVolumeCountDelta)
}
}
func (n *NodeImpl) UpAdjustVolumeCountDelta(volumeCountDelta int) { //can be negative
n.volumeCount += volumeCountDelta
if n.parent != nil {
n.parent.UpAdjustVolumeCountDelta(volumeCountDelta)
}
}
func (n *NodeImpl) UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta int) { //can be negative
n.activeVolumeCount += activeVolumeCountDelta
if n.parent != nil {
n.parent.UpAdjustActiveVolumeCountDelta(activeVolumeCountDelta)
}
}
func (n *NodeImpl) UpAdjustMaxVolumeId(vid storage.VolumeId) { //can be negative
if n.maxVolumeId < vid {
n.maxVolumeId = vid
if n.parent != nil {
n.parent.UpAdjustMaxVolumeId(vid)
}
}
}
func (n *NodeImpl) GetMaxVolumeId() storage.VolumeId {
return n.maxVolumeId
}
func (n *NodeImpl) GetVolumeCount() int {
return n.volumeCount
}
func (n *NodeImpl) GetActiveVolumeCount() int {
return n.activeVolumeCount
}
func (n *NodeImpl) GetMaxVolumeCount() int {
return n.maxVolumeCount
}
func (n *NodeImpl) LinkChildNode(node Node) {
n.Lock()
defer n.Unlock()
if n.children[node.Id()] == nil {
n.children[node.Id()] = node
n.UpAdjustMaxVolumeCountDelta(node.GetMaxVolumeCount())
n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
n.UpAdjustVolumeCountDelta(node.GetVolumeCount())
n.UpAdjustActiveVolumeCountDelta(node.GetActiveVolumeCount())
node.SetParent(n)
glog.V(0).Infoln(n, "adds child", node.Id())
}
}
func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
n.Lock()
defer n.Unlock()
node := n.children[nodeId]
if node != nil {
node.SetParent(nil)
delete(n.children, node.Id())
n.UpAdjustVolumeCountDelta(-node.GetVolumeCount())
n.UpAdjustActiveVolumeCountDelta(-node.GetActiveVolumeCount())
n.UpAdjustMaxVolumeCountDelta(-node.GetMaxVolumeCount())
glog.V(0).Infoln(n, "removes", node, "volumeCount =", n.activeVolumeCount)
}
}
func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64) {
if n.IsRack() {
for _, c := range n.Children() {
dn := c.(*DataNode) //can not cast n to DataNode
if dn.LastSeen < freshThreshHold {
if !dn.Dead {
dn.Dead = true
n.GetTopology().chanDeadDataNodes <- dn
}
}
for _, v := range dn.GetVolumes() {
if uint64(v.Size) >= volumeSizeLimit {
//fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
n.GetTopology().chanFullVolumes <- v
}
}
}
} else {
for _, c := range n.Children() {
c.CollectDeadNodeAndFullVolumes(freshThreshHold, volumeSizeLimit)
}
}
}
func (n *NodeImpl) GetTopology() *Topology {
var p Node
p = n
for p.Parent() != nil {
p = p.Parent()
}
return p.GetValue().(*Topology)
}

65
weed/topology/rack.go Normal file
View File

@@ -0,0 +1,65 @@
package topology
import (
"strconv"
"time"
)
type Rack struct {
NodeImpl
}
func NewRack(id string) *Rack {
r := &Rack{}
r.id = NodeId(id)
r.nodeType = "Rack"
r.children = make(map[NodeId]Node)
r.NodeImpl.value = r
return r
}
func (r *Rack) FindDataNode(ip string, port int) *DataNode {
for _, c := range r.Children() {
dn := c.(*DataNode)
if dn.MatchLocation(ip, port) {
return dn
}
}
return nil
}
func (r *Rack) GetOrCreateDataNode(ip string, port int, publicUrl string, maxVolumeCount int) *DataNode {
for _, c := range r.Children() {
dn := c.(*DataNode)
if dn.MatchLocation(ip, port) {
dn.LastSeen = time.Now().Unix()
if dn.Dead {
dn.Dead = false
r.GetTopology().chanRecoveredDataNodes <- dn
dn.UpAdjustMaxVolumeCountDelta(maxVolumeCount - dn.maxVolumeCount)
}
return dn
}
}
dn := NewDataNode(ip + ":" + strconv.Itoa(port))
dn.Ip = ip
dn.Port = port
dn.PublicUrl = publicUrl
dn.maxVolumeCount = maxVolumeCount
dn.LastSeen = time.Now().Unix()
r.LinkChildNode(dn)
return dn
}
func (r *Rack) ToMap() interface{} {
m := make(map[string]interface{})
m["Id"] = r.Id()
m["Max"] = r.GetMaxVolumeCount()
m["Free"] = r.FreeSpace()
var dns []interface{}
for _, c := range r.Children() {
dn := c.(*DataNode)
dns = append(dns, dn.ToMap())
}
m["DataNodes"] = dns
return m
}

View File

@@ -0,0 +1,150 @@
package topology
import (
"bytes"
"errors"
"fmt"
"net/http"
"strconv"
"strings"
"net/url"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
)
func ReplicatedWrite(masterNode string, s *storage.Store,
volumeId storage.VolumeId, needle *storage.Needle,
r *http.Request) (size uint32, errorStatus string) {
//check JWT
jwt := security.GetJwt(r)
ret, err := s.Write(volumeId, needle)
needToReplicate := !s.HasVolume(volumeId)
if err != nil {
errorStatus = "Failed to write to local disk (" + err.Error() + ")"
} else if ret > 0 {
needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate()
} else {
errorStatus = "Failed to write to local disk"
}
if !needToReplicate && ret > 0 {
needToReplicate = s.GetVolume(volumeId).NeedToReplicate()
}
if needToReplicate { //send to other replica locations
if r.FormValue("type") != "replicate" {
if err = distributedOperation(masterNode, s, volumeId, func(location operation.Location) error {
u := url.URL{
Scheme: "http",
Host: location.Url,
Path: r.URL.Path,
}
q := url.Values{
"type": {"replicate"},
}
if needle.LastModified > 0 {
q.Set("ts", strconv.FormatUint(needle.LastModified, 10))
}
if needle.IsChunkedManifest() {
q.Set("cm", "true")
}
u.RawQuery = q.Encode()
_, err := operation.Upload(u.String(),
string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime),
jwt)
return err
}); err != nil {
ret = 0
errorStatus = fmt.Sprintf("Failed to write to replicas for volume %d: %v", volumeId, err)
}
}
}
size = ret
return
}
func ReplicatedDelete(masterNode string, store *storage.Store,
volumeId storage.VolumeId, n *storage.Needle,
r *http.Request) (uint32, error) {
//check JWT
jwt := security.GetJwt(r)
ret, err := store.Delete(volumeId, n)
if err != nil {
glog.V(0).Infoln("delete error:", err)
return ret, err
}
needToReplicate := !store.HasVolume(volumeId)
if !needToReplicate && ret > 0 {
needToReplicate = store.GetVolume(volumeId).NeedToReplicate()
}
if needToReplicate { //send to other replica locations
if r.FormValue("type") != "replicate" {
if err = distributedOperation(masterNode, store, volumeId, func(location operation.Location) error {
return util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", jwt)
}); err != nil {
ret = 0
}
}
}
return ret, err
}
type DistributedOperationResult map[string]error
func (dr DistributedOperationResult) Error() error {
var errs []string
for k, v := range dr {
if v != nil {
errs = append(errs, fmt.Sprintf("[%s]: %v", k, v))
}
}
if len(errs) == 0 {
return nil
}
return errors.New(strings.Join(errs, "\n"))
}
type RemoteResult struct {
Host string
Error error
}
func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) error) error {
if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil {
length := 0
selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port))
results := make(chan RemoteResult)
for _, location := range lookupResult.Locations {
if location.Url != selfUrl {
length++
go func(location operation.Location, results chan RemoteResult) {
results <- RemoteResult{location.Url, op(location)}
}(location, results)
}
}
ret := DistributedOperationResult(make(map[string]error))
for i := 0; i < length; i++ {
result := <-results
ret[result.Host] = result.Error
}
if volume := store.GetVolume(volumeId); volume != nil {
if length+1 < volume.ReplicaPlacement.GetCopyCount() {
return fmt.Errorf("replicating opetations [%d] is less than volume's replication copy count [%d]", length+1, volume.ReplicaPlacement.GetCopyCount())
}
}
return ret.Error()
} else {
glog.V(0).Infoln()
return fmt.Errorf("Failed to lookup for %d: %v", volumeId, lookupErr)
}
return nil
}

View File

@@ -0,0 +1,17 @@
package topology
import (
"testing"
)
func TestRemoveDataCenter(t *testing.T) {
topo := setup(topologyLayout)
topo.UnlinkChildNode(NodeId("dc2"))
if topo.GetActiveVolumeCount() != 15 {
t.Fail()
}
topo.UnlinkChildNode(NodeId("dc3"))
if topo.GetActiveVolumeCount() != 12 {
t.Fail()
}
}

189
weed/topology/topology.go Normal file
View File

@@ -0,0 +1,189 @@
package topology
import (
"errors"
"io/ioutil"
"math/rand"
"github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/sequence"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
)
type Topology struct {
NodeImpl
collectionMap *util.ConcurrentReadMap
pulse int64
volumeSizeLimit uint64
Sequence sequence.Sequencer
chanDeadDataNodes chan *DataNode
chanRecoveredDataNodes chan *DataNode
chanFullVolumes chan storage.VolumeInfo
configuration *Configuration
RaftServer raft.Server
}
func NewTopology(id string, confFile string, seq sequence.Sequencer, volumeSizeLimit uint64, pulse int) (*Topology, error) {
t := &Topology{}
t.id = NodeId(id)
t.nodeType = "Topology"
t.NodeImpl.value = t
t.children = make(map[NodeId]Node)
t.collectionMap = util.NewConcurrentReadMap()
t.pulse = int64(pulse)
t.volumeSizeLimit = volumeSizeLimit
t.Sequence = seq
t.chanDeadDataNodes = make(chan *DataNode)
t.chanRecoveredDataNodes = make(chan *DataNode)
t.chanFullVolumes = make(chan storage.VolumeInfo)
err := t.loadConfiguration(confFile)
return t, err
}
func (t *Topology) IsLeader() bool {
if leader, e := t.Leader(); e == nil {
return leader == t.RaftServer.Name()
}
return false
}
func (t *Topology) Leader() (string, error) {
l := ""
if t.RaftServer != nil {
l = t.RaftServer.Leader()
} else {
return "", errors.New("Raft Server not ready yet!")
}
if l == "" {
// We are a single node cluster, we are the leader
return t.RaftServer.Name(), errors.New("Raft Server not initialized!")
}
return l, nil
}
func (t *Topology) loadConfiguration(configurationFile string) error {
b, e := ioutil.ReadFile(configurationFile)
if e == nil {
t.configuration, e = NewConfiguration(b)
return e
}
glog.V(0).Infoln("Using default configurations.")
return nil
}
func (t *Topology) Lookup(collection string, vid storage.VolumeId) []*DataNode {
//maybe an issue if lots of collections?
if collection == "" {
for _, c := range t.collectionMap.Items() {
if list := c.(*Collection).Lookup(vid); list != nil {
return list
}
}
} else {
if c, ok := t.collectionMap.Find(collection); ok {
return c.(*Collection).Lookup(vid)
}
}
return nil
}
func (t *Topology) NextVolumeId() storage.VolumeId {
vid := t.GetMaxVolumeId()
next := vid.Next()
go t.RaftServer.Do(NewMaxVolumeIdCommand(next))
return next
}
func (t *Topology) HasWritableVolume(option *VolumeGrowOption) bool {
vl := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl)
return vl.GetActiveVolumeCount(option) > 0
}
func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string, uint64, *DataNode, error) {
vid, count, datanodes, err := t.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl).PickForWrite(count, option)
if err != nil || datanodes.Length() == 0 {
return "", 0, nil, errors.New("No writable volumes available!")
}
fileId, count := t.Sequence.NextFileId(count)
return storage.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
}
func (t *Topology) GetVolumeLayout(collectionName string, rp *storage.ReplicaPlacement, ttl *storage.TTL) *VolumeLayout {
return t.collectionMap.Get(collectionName, func() interface{} {
return NewCollection(collectionName, t.volumeSizeLimit)
}).(*Collection).GetOrCreateVolumeLayout(rp, ttl)
}
func (t *Topology) FindCollection(collectionName string) (*Collection, bool) {
c, hasCollection := t.collectionMap.Find(collectionName)
return c.(*Collection), hasCollection
}
func (t *Topology) DeleteCollection(collectionName string) {
t.collectionMap.Delete(collectionName)
}
func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).RegisterVolume(&v, dn)
}
func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
glog.Infof("removing volume info:%+v", v)
t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl).UnRegisterVolume(&v, dn)
}
func (t *Topology) ProcessJoinMessage(joinMessage *operation.JoinMessage) {
t.Sequence.SetMax(*joinMessage.MaxFileKey)
dcName, rackName := t.configuration.Locate(*joinMessage.Ip, *joinMessage.DataCenter, *joinMessage.Rack)
dc := t.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName)
dn := rack.FindDataNode(*joinMessage.Ip, int(*joinMessage.Port))
if *joinMessage.IsInit && dn != nil {
t.UnRegisterDataNode(dn)
}
dn = rack.GetOrCreateDataNode(*joinMessage.Ip,
int(*joinMessage.Port), *joinMessage.PublicUrl,
int(*joinMessage.MaxVolumeCount))
var volumeInfos []storage.VolumeInfo
for _, v := range joinMessage.Volumes {
if vi, err := storage.NewVolumeInfo(v); err == nil {
volumeInfos = append(volumeInfos, vi)
} else {
glog.V(0).Infoln("Fail to convert joined volume information:", err.Error())
}
}
deletedVolumes := dn.UpdateVolumes(volumeInfos)
for _, v := range volumeInfos {
t.RegisterVolumeLayout(v, dn)
}
for _, v := range deletedVolumes {
t.UnRegisterVolumeLayout(v, dn)
}
}
func (t *Topology) GetOrCreateDataCenter(dcName string) *DataCenter {
for _, c := range t.Children() {
dc := c.(*DataCenter)
if string(dc.Id()) == dcName {
return dc
}
}
dc := NewDataCenter(dcName)
t.LinkChildNode(dc)
return dc
}

View File

@@ -0,0 +1,74 @@
package topology
import (
"math/rand"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
)
func (t *Topology) StartRefreshWritableVolumes(garbageThreshold string) {
go func() {
for {
if t.IsLeader() {
freshThreshHold := time.Now().Unix() - 3*t.pulse //3 times of sleep interval
t.CollectDeadNodeAndFullVolumes(freshThreshHold, t.volumeSizeLimit)
}
time.Sleep(time.Duration(float32(t.pulse*1e3)*(1+rand.Float32())) * time.Millisecond)
}
}()
go func(garbageThreshold string) {
c := time.Tick(15 * time.Minute)
for _ = range c {
if t.IsLeader() {
t.Vacuum(garbageThreshold)
}
}
}(garbageThreshold)
go func() {
for {
select {
case v := <-t.chanFullVolumes:
t.SetVolumeCapacityFull(v)
case dn := <-t.chanRecoveredDataNodes:
t.RegisterRecoveredDataNode(dn)
glog.V(0).Infoln("DataNode", dn, "is back alive!")
case dn := <-t.chanDeadDataNodes:
t.UnRegisterDataNode(dn)
glog.V(0).Infoln("DataNode", dn, "is dead!")
}
}
}()
}
func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl)
if !vl.SetVolumeCapacityFull(volumeInfo.Id) {
return false
}
for _, dn := range vl.vid2location[volumeInfo.Id].list {
if !volumeInfo.ReadOnly {
dn.UpAdjustActiveVolumeCountDelta(-1)
}
}
return true
}
func (t *Topology) UnRegisterDataNode(dn *DataNode) {
for _, v := range dn.GetVolumes() {
glog.V(0).Infoln("Removing Volume", v.Id, "from the dead volume server", dn)
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl)
vl.SetVolumeUnavailable(dn, v.Id)
}
dn.UpAdjustVolumeCountDelta(-dn.GetVolumeCount())
dn.UpAdjustActiveVolumeCountDelta(-dn.GetActiveVolumeCount())
dn.UpAdjustMaxVolumeCountDelta(-dn.GetMaxVolumeCount())
dn.Parent().UnlinkChildNode(dn.Id())
}
func (t *Topology) RegisterRecoveredDataNode(dn *DataNode) {
for _, v := range dn.GetVolumes() {
vl := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl)
if vl.isWritable(&v) {
vl.SetVolumeAvailable(dn, v.Id)
}
}
}

View File

@@ -0,0 +1,53 @@
package topology
func (t *Topology) ToMap() interface{} {
m := make(map[string]interface{})
m["Max"] = t.GetMaxVolumeCount()
m["Free"] = t.FreeSpace()
var dcs []interface{}
for _, c := range t.Children() {
dc := c.(*DataCenter)
dcs = append(dcs, dc.ToMap())
}
m["DataCenters"] = dcs
var layouts []interface{}
for _, col := range t.collectionMap.Items() {
c := col.(*Collection)
for _, layout := range c.storageType2VolumeLayout.Items() {
if layout != nil {
tmp := layout.(*VolumeLayout).ToMap()
tmp["collection"] = c.Name
layouts = append(layouts, tmp)
}
}
}
m["layouts"] = layouts
return m
}
func (t *Topology) ToVolumeMap() interface{} {
m := make(map[string]interface{})
m["Max"] = t.GetMaxVolumeCount()
m["Free"] = t.FreeSpace()
dcs := make(map[NodeId]interface{})
for _, c := range t.Children() {
dc := c.(*DataCenter)
racks := make(map[NodeId]interface{})
for _, r := range dc.Children() {
rack := r.(*Rack)
dataNodes := make(map[NodeId]interface{})
for _, d := range rack.Children() {
dn := d.(*DataNode)
var volumes []interface{}
for _, v := range dn.GetVolumes() {
volumes = append(volumes, v)
}
dataNodes[d.Id()] = volumes
}
racks[r.Id()] = dataNodes
}
dcs[dc.Id()] = racks
}
m["DataCenters"] = dcs
return m
}

View File

@@ -0,0 +1,158 @@
package topology
import (
"encoding/json"
"errors"
"net/url"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/util"
)
func batchVacuumVolumeCheck(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList, garbageThreshold string) bool {
ch := make(chan bool, locationlist.Length())
for index, dn := range locationlist.list {
go func(index int, url string, vid storage.VolumeId) {
//glog.V(0).Infoln(index, "Check vacuuming", vid, "on", dn.Url())
if e, ret := vacuumVolume_Check(url, vid, garbageThreshold); e != nil {
//glog.V(0).Infoln(index, "Error when checking vacuuming", vid, "on", url, e)
ch <- false
} else {
//glog.V(0).Infoln(index, "Checked vacuuming", vid, "on", url, "needVacuum", ret)
ch <- ret
}
}(index, dn.Url(), vid)
}
isCheckSuccess := true
for _ = range locationlist.list {
select {
case canVacuum := <-ch:
isCheckSuccess = isCheckSuccess && canVacuum
case <-time.After(30 * time.Minute):
isCheckSuccess = false
break
}
}
return isCheckSuccess
}
func batchVacuumVolumeCompact(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
vl.removeFromWritable(vid)
ch := make(chan bool, locationlist.Length())
for index, dn := range locationlist.list {
go func(index int, url string, vid storage.VolumeId) {
glog.V(0).Infoln(index, "Start vacuuming", vid, "on", url)
if e := vacuumVolume_Compact(url, vid); e != nil {
glog.V(0).Infoln(index, "Error when vacuuming", vid, "on", url, e)
ch <- false
} else {
glog.V(0).Infoln(index, "Complete vacuuming", vid, "on", url)
ch <- true
}
}(index, dn.Url(), vid)
}
isVacuumSuccess := true
for _ = range locationlist.list {
select {
case _ = <-ch:
case <-time.After(30 * time.Minute):
isVacuumSuccess = false
break
}
}
return isVacuumSuccess
}
func batchVacuumVolumeCommit(vl *VolumeLayout, vid storage.VolumeId, locationlist *VolumeLocationList) bool {
isCommitSuccess := true
for _, dn := range locationlist.list {
glog.V(0).Infoln("Start Commiting vacuum", vid, "on", dn.Url())
if e := vacuumVolume_Commit(dn.Url(), vid); e != nil {
glog.V(0).Infoln("Error when committing vacuum", vid, "on", dn.Url(), e)
isCommitSuccess = false
} else {
glog.V(0).Infoln("Complete Commiting vacuum", vid, "on", dn.Url())
}
if isCommitSuccess {
vl.SetVolumeAvailable(dn, vid)
}
}
return isCommitSuccess
}
func (t *Topology) Vacuum(garbageThreshold string) int {
glog.V(0).Infoln("Start vacuum on demand")
for _, col := range t.collectionMap.Items() {
c := col.(*Collection)
glog.V(0).Infoln("vacuum on collection:", c.Name)
for _, vl := range c.storageType2VolumeLayout.Items() {
if vl != nil {
volumeLayout := vl.(*VolumeLayout)
for vid, locationlist := range volumeLayout.vid2location {
glog.V(0).Infoln("vacuum on collection:", c.Name, "volume", vid)
if batchVacuumVolumeCheck(volumeLayout, vid, locationlist, garbageThreshold) {
if batchVacuumVolumeCompact(volumeLayout, vid, locationlist) {
batchVacuumVolumeCommit(volumeLayout, vid, locationlist)
}
}
}
}
}
}
return 0
}
type VacuumVolumeResult struct {
Result bool
Error string
}
func vacuumVolume_Check(urlLocation string, vid storage.VolumeId, garbageThreshold string) (error, bool) {
values := make(url.Values)
values.Add("volume", vid.String())
values.Add("garbageThreshold", garbageThreshold)
jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/check", values)
if err != nil {
glog.V(0).Infoln("parameters:", values)
return err, false
}
var ret VacuumVolumeResult
if err := json.Unmarshal(jsonBlob, &ret); err != nil {
return err, false
}
if ret.Error != "" {
return errors.New(ret.Error), false
}
return nil, ret.Result
}
func vacuumVolume_Compact(urlLocation string, vid storage.VolumeId) error {
values := make(url.Values)
values.Add("volume", vid.String())
jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/compact", values)
if err != nil {
return err
}
var ret VacuumVolumeResult
if err := json.Unmarshal(jsonBlob, &ret); err != nil {
return err
}
if ret.Error != "" {
return errors.New(ret.Error)
}
return nil
}
func vacuumVolume_Commit(urlLocation string, vid storage.VolumeId) error {
values := make(url.Values)
values.Add("volume", vid.String())
jsonBlob, err := util.Post("http://"+urlLocation+"/admin/vacuum/commit", values)
if err != nil {
return err
}
var ret VacuumVolumeResult
if err := json.Unmarshal(jsonBlob, &ret); err != nil {
return err
}
if ret.Error != "" {
return errors.New(ret.Error)
}
return nil
}

View File

@@ -0,0 +1,211 @@
package topology
import (
"fmt"
"math/rand"
"sync"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
)
/*
This package is created to resolve these replica placement issues:
1. growth factor for each replica level, e.g., add 10 volumes for 1 copy, 20 volumes for 2 copies, 30 volumes for 3 copies
2. in time of tight storage, how to reduce replica level
3. optimizing for hot data on faster disk, cold data on cheaper storage,
4. volume allocation for each bucket
*/
type VolumeGrowOption struct {
Collection string
ReplicaPlacement *storage.ReplicaPlacement
Ttl *storage.TTL
DataCenter string
Rack string
DataNode string
}
type VolumeGrowth struct {
accessLock sync.Mutex
}
func (o *VolumeGrowOption) String() string {
return fmt.Sprintf("Collection:%s, ReplicaPlacement:%v, Ttl:%v, DataCenter:%s, Rack:%s, DataNode:%s", o.Collection, o.ReplicaPlacement, o.Ttl, o.DataCenter, o.Rack, o.DataNode)
}
func NewDefaultVolumeGrowth() *VolumeGrowth {
return &VolumeGrowth{}
}
// one replication type may need rp.GetCopyCount() actual volumes
// given copyCount, how many logical volumes to create
func (vg *VolumeGrowth) findVolumeCount(copyCount int) (count int) {
switch copyCount {
case 1:
count = 7
case 2:
count = 6
case 3:
count = 3
default:
count = 1
}
return
}
func (vg *VolumeGrowth) AutomaticGrowByType(option *VolumeGrowOption, topo *Topology) (count int, err error) {
count, err = vg.GrowByCountAndType(vg.findVolumeCount(option.ReplicaPlacement.GetCopyCount()), option, topo)
if count > 0 && count%option.ReplicaPlacement.GetCopyCount() == 0 {
return count, nil
}
return count, err
}
func (vg *VolumeGrowth) GrowByCountAndType(targetCount int, option *VolumeGrowOption, topo *Topology) (counter int, err error) {
vg.accessLock.Lock()
defer vg.accessLock.Unlock()
for i := 0; i < targetCount; i++ {
if c, e := vg.findAndGrow(topo, option); e == nil {
counter += c
} else {
return counter, e
}
}
return
}
func (vg *VolumeGrowth) findAndGrow(topo *Topology, option *VolumeGrowOption) (int, error) {
servers, e := vg.findEmptySlotsForOneVolume(topo, option)
if e != nil {
return 0, e
}
vid := topo.NextVolumeId()
err := vg.grow(topo, vid, option, servers...)
return len(servers), err
}
// 1. find the main data node
// 1.1 collect all data nodes that have 1 slots
// 2.2 collect all racks that have rp.SameRackCount+1
// 2.2 collect all data centers that have DiffRackCount+rp.SameRackCount+1
// 2. find rest data nodes
func (vg *VolumeGrowth) findEmptySlotsForOneVolume(topo *Topology, option *VolumeGrowOption) (servers []*DataNode, err error) {
//find main datacenter and other data centers
rp := option.ReplicaPlacement
mainDataCenter, otherDataCenters, dc_err := topo.RandomlyPickNodes(rp.DiffDataCenterCount+1, func(node Node) error {
if option.DataCenter != "" && node.IsDataCenter() && node.Id() != NodeId(option.DataCenter) {
return fmt.Errorf("Not matching preferred data center:%s", option.DataCenter)
}
if len(node.Children()) < rp.DiffRackCount+1 {
return fmt.Errorf("Only has %d racks, not enough for %d.", len(node.Children()), rp.DiffRackCount+1)
}
if node.FreeSpace() < rp.DiffRackCount+rp.SameRackCount+1 {
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.DiffRackCount+rp.SameRackCount+1)
}
possibleRacksCount := 0
for _, rack := range node.Children() {
possibleDataNodesCount := 0
for _, n := range rack.Children() {
if n.FreeSpace() >= 1 {
possibleDataNodesCount++
}
}
if possibleDataNodesCount >= rp.SameRackCount+1 {
possibleRacksCount++
}
}
if possibleRacksCount < rp.DiffRackCount+1 {
return fmt.Errorf("Only has %d racks with more than %d free data nodes, not enough for %d.", possibleRacksCount, rp.SameRackCount+1, rp.DiffRackCount+1)
}
return nil
})
if dc_err != nil {
return nil, dc_err
}
//find main rack and other racks
mainRack, otherRacks, rack_err := mainDataCenter.(*DataCenter).RandomlyPickNodes(rp.DiffRackCount+1, func(node Node) error {
if option.Rack != "" && node.IsRack() && node.Id() != NodeId(option.Rack) {
return fmt.Errorf("Not matching preferred rack:%s", option.Rack)
}
if node.FreeSpace() < rp.SameRackCount+1 {
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), rp.SameRackCount+1)
}
if len(node.Children()) < rp.SameRackCount+1 {
// a bit faster way to test free racks
return fmt.Errorf("Only has %d data nodes, not enough for %d.", len(node.Children()), rp.SameRackCount+1)
}
possibleDataNodesCount := 0
for _, n := range node.Children() {
if n.FreeSpace() >= 1 {
possibleDataNodesCount++
}
}
if possibleDataNodesCount < rp.SameRackCount+1 {
return fmt.Errorf("Only has %d data nodes with a slot, not enough for %d.", possibleDataNodesCount, rp.SameRackCount+1)
}
return nil
})
if rack_err != nil {
return nil, rack_err
}
//find main rack and other racks
mainServer, otherServers, server_err := mainRack.(*Rack).RandomlyPickNodes(rp.SameRackCount+1, func(node Node) error {
if option.DataNode != "" && node.IsDataNode() && node.Id() != NodeId(option.DataNode) {
return fmt.Errorf("Not matching preferred data node:%s", option.DataNode)
}
if node.FreeSpace() < 1 {
return fmt.Errorf("Free:%d < Expected:%d", node.FreeSpace(), 1)
}
return nil
})
if server_err != nil {
return nil, server_err
}
servers = append(servers, mainServer.(*DataNode))
for _, server := range otherServers {
servers = append(servers, server.(*DataNode))
}
for _, rack := range otherRacks {
r := rand.Intn(rack.FreeSpace())
if server, e := rack.ReserveOneVolume(r); e == nil {
servers = append(servers, server)
} else {
return servers, e
}
}
for _, datacenter := range otherDataCenters {
r := rand.Intn(datacenter.FreeSpace())
if server, e := datacenter.ReserveOneVolume(r); e == nil {
servers = append(servers, server)
} else {
return servers, e
}
}
return
}
func (vg *VolumeGrowth) grow(topo *Topology, vid storage.VolumeId, option *VolumeGrowOption, servers ...*DataNode) error {
for _, server := range servers {
if err := AllocateVolume(server, vid, option); err == nil {
vi := storage.VolumeInfo{
Id: vid,
Size: 0,
Collection: option.Collection,
ReplicaPlacement: option.ReplicaPlacement,
Ttl: option.Ttl,
Version: storage.CurrentVersion,
}
server.AddOrUpdateVolume(vi)
topo.RegisterVolumeLayout(vi, server)
glog.V(0).Infoln("Created Volume", vid, "on", server.NodeImpl.String())
} else {
glog.V(0).Infoln("Failed to assign volume", vid, "to", servers, "error", err)
return fmt.Errorf("Failed to assign %d: %v", vid, err)
}
}
return nil
}

View File

@@ -0,0 +1,135 @@
package topology
import (
"encoding/json"
"fmt"
"testing"
"github.com/chrislusf/seaweedfs/weed/sequence"
"github.com/chrislusf/seaweedfs/weed/storage"
)
var topologyLayout = `
{
"dc1":{
"rack1":{
"server111":{
"volumes":[
{"id":1, "size":12312},
{"id":2, "size":12312},
{"id":3, "size":12312}
],
"limit":3
},
"server112":{
"volumes":[
{"id":4, "size":12312},
{"id":5, "size":12312},
{"id":6, "size":12312}
],
"limit":10
}
},
"rack2":{
"server121":{
"volumes":[
{"id":4, "size":12312},
{"id":5, "size":12312},
{"id":6, "size":12312}
],
"limit":4
},
"server122":{
"volumes":[],
"limit":4
},
"server123":{
"volumes":[
{"id":2, "size":12312},
{"id":3, "size":12312},
{"id":4, "size":12312}
],
"limit":5
}
}
},
"dc2":{
},
"dc3":{
"rack2":{
"server321":{
"volumes":[
{"id":1, "size":12312},
{"id":3, "size":12312},
{"id":5, "size":12312}
],
"limit":4
}
}
}
}
`
func setup(topologyLayout string) *Topology {
var data interface{}
err := json.Unmarshal([]byte(topologyLayout), &data)
if err != nil {
fmt.Println("error:", err)
}
fmt.Println("data:", data)
//need to connect all nodes first before server adding volumes
topo, err := NewTopology("weedfs", "/etc/weedfs/weedfs.conf",
sequence.NewMemorySequencer(), 32*1024, 5)
if err != nil {
panic("error: " + err.Error())
}
mTopology := data.(map[string]interface{})
for dcKey, dcValue := range mTopology {
dc := NewDataCenter(dcKey)
dcMap := dcValue.(map[string]interface{})
topo.LinkChildNode(dc)
for rackKey, rackValue := range dcMap {
rack := NewRack(rackKey)
rackMap := rackValue.(map[string]interface{})
dc.LinkChildNode(rack)
for serverKey, serverValue := range rackMap {
server := NewDataNode(serverKey)
serverMap := serverValue.(map[string]interface{})
rack.LinkChildNode(server)
for _, v := range serverMap["volumes"].([]interface{}) {
m := v.(map[string]interface{})
vi := storage.VolumeInfo{
Id: storage.VolumeId(int64(m["id"].(float64))),
Size: uint64(m["size"].(float64)),
Version: storage.CurrentVersion}
server.AddOrUpdateVolume(vi)
}
server.UpAdjustMaxVolumeCountDelta(int(serverMap["limit"].(float64)))
}
}
}
return topo
}
func TestFindEmptySlotsForOneVolume(t *testing.T) {
topo := setup(topologyLayout)
vg := NewDefaultVolumeGrowth()
rp, _ := storage.NewReplicaPlacementFromString("002")
volumeGrowOption := &VolumeGrowOption{
Collection: "",
ReplicaPlacement: rp,
DataCenter: "dc1",
Rack: "",
DataNode: "",
}
servers, err := vg.findEmptySlotsForOneVolume(topo, volumeGrowOption)
if err != nil {
fmt.Println("finding empty slots error :", err)
t.Fail()
}
for _, server := range servers {
fmt.Println("assigned node :", server.Id())
}
}

View File

@@ -0,0 +1,226 @@
package topology
import (
"errors"
"fmt"
"math/rand"
"sync"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage"
)
// mapping from volume to its locations, inverted from server to volume
type VolumeLayout struct {
rp *storage.ReplicaPlacement
ttl *storage.TTL
vid2location map[storage.VolumeId]*VolumeLocationList
writables []storage.VolumeId // transient array of writable volume id
volumeSizeLimit uint64
accessLock sync.RWMutex
}
func NewVolumeLayout(rp *storage.ReplicaPlacement, ttl *storage.TTL, volumeSizeLimit uint64) *VolumeLayout {
return &VolumeLayout{
rp: rp,
ttl: ttl,
vid2location: make(map[storage.VolumeId]*VolumeLocationList),
writables: *new([]storage.VolumeId),
volumeSizeLimit: volumeSizeLimit,
}
}
func (vl *VolumeLayout) String() string {
return fmt.Sprintf("rp:%v, ttl:%v, vid2location:%v, writables:%v, volumeSizeLimit:%v", vl.rp, vl.ttl, vl.vid2location, vl.writables, vl.volumeSizeLimit)
}
func (vl *VolumeLayout) RegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
if _, ok := vl.vid2location[v.Id]; !ok {
vl.vid2location[v.Id] = NewVolumeLocationList()
}
vl.vid2location[v.Id].Set(dn)
glog.V(4).Infoln("volume", v.Id, "added to dn", dn.Id(), "len", vl.vid2location[v.Id].Length(), "copy", v.ReplicaPlacement.GetCopyCount())
if vl.vid2location[v.Id].Length() == vl.rp.GetCopyCount() && vl.isWritable(v) {
vl.addToWritable(v.Id)
} else {
vl.removeFromWritable(v.Id)
}
}
func (vl *VolumeLayout) UnRegisterVolume(v *storage.VolumeInfo, dn *DataNode) {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
vl.removeFromWritable(v.Id)
delete(vl.vid2location, v.Id)
}
func (vl *VolumeLayout) addToWritable(vid storage.VolumeId) {
for _, id := range vl.writables {
if vid == id {
return
}
}
vl.writables = append(vl.writables, vid)
}
func (vl *VolumeLayout) isWritable(v *storage.VolumeInfo) bool {
return uint64(v.Size) < vl.volumeSizeLimit &&
v.Version == storage.CurrentVersion &&
!v.ReadOnly
}
func (vl *VolumeLayout) Lookup(vid storage.VolumeId) []*DataNode {
vl.accessLock.RLock()
defer vl.accessLock.RUnlock()
if location := vl.vid2location[vid]; location != nil {
return location.list
}
return nil
}
func (vl *VolumeLayout) ListVolumeServers() (nodes []*DataNode) {
vl.accessLock.RLock()
defer vl.accessLock.RUnlock()
for _, location := range vl.vid2location {
nodes = append(nodes, location.list...)
}
return
}
func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*storage.VolumeId, uint64, *VolumeLocationList, error) {
vl.accessLock.RLock()
defer vl.accessLock.RUnlock()
len_writers := len(vl.writables)
if len_writers <= 0 {
glog.V(0).Infoln("No more writable volumes!")
return nil, 0, nil, errors.New("No more writable volumes!")
}
if option.DataCenter == "" {
vid := vl.writables[rand.Intn(len_writers)]
locationList := vl.vid2location[vid]
if locationList != nil {
return &vid, count, locationList, nil
}
return nil, 0, nil, errors.New("Strangely vid " + vid.String() + " is on no machine!")
}
var vid storage.VolumeId
var locationList *VolumeLocationList
counter := 0
for _, v := range vl.writables {
volumeLocationList := vl.vid2location[v]
for _, dn := range volumeLocationList.list {
if dn.GetDataCenter().Id() == NodeId(option.DataCenter) {
if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) {
continue
}
if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) {
continue
}
counter++
if rand.Intn(counter) < 1 {
vid, locationList = v, volumeLocationList
}
}
}
}
return &vid, count, locationList, nil
}
func (vl *VolumeLayout) GetActiveVolumeCount(option *VolumeGrowOption) int {
vl.accessLock.RLock()
defer vl.accessLock.RUnlock()
if option.DataCenter == "" {
return len(vl.writables)
}
counter := 0
for _, v := range vl.writables {
for _, dn := range vl.vid2location[v].list {
if dn.GetDataCenter().Id() == NodeId(option.DataCenter) {
if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) {
continue
}
if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) {
continue
}
counter++
}
}
}
return counter
}
func (vl *VolumeLayout) removeFromWritable(vid storage.VolumeId) bool {
toDeleteIndex := -1
for k, id := range vl.writables {
if id == vid {
toDeleteIndex = k
break
}
}
if toDeleteIndex >= 0 {
glog.V(0).Infoln("Volume", vid, "becomes unwritable")
vl.writables = append(vl.writables[0:toDeleteIndex], vl.writables[toDeleteIndex+1:]...)
return true
}
return false
}
func (vl *VolumeLayout) setVolumeWritable(vid storage.VolumeId) bool {
for _, v := range vl.writables {
if v == vid {
return false
}
}
glog.V(0).Infoln("Volume", vid, "becomes writable")
vl.writables = append(vl.writables, vid)
return true
}
func (vl *VolumeLayout) SetVolumeUnavailable(dn *DataNode, vid storage.VolumeId) bool {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
if location, ok := vl.vid2location[vid]; ok {
if location.Remove(dn) {
if location.Length() < vl.rp.GetCopyCount() {
glog.V(0).Infoln("Volume", vid, "has", location.Length(), "replica, less than required", vl.rp.GetCopyCount())
return vl.removeFromWritable(vid)
}
}
}
return false
}
func (vl *VolumeLayout) SetVolumeAvailable(dn *DataNode, vid storage.VolumeId) bool {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
vl.vid2location[vid].Set(dn)
if vl.vid2location[vid].Length() >= vl.rp.GetCopyCount() {
return vl.setVolumeWritable(vid)
}
return false
}
func (vl *VolumeLayout) SetVolumeCapacityFull(vid storage.VolumeId) bool {
vl.accessLock.Lock()
defer vl.accessLock.Unlock()
// glog.V(0).Infoln("Volume", vid, "reaches full capacity.")
return vl.removeFromWritable(vid)
}
func (vl *VolumeLayout) ToMap() map[string]interface{} {
m := make(map[string]interface{})
m["replication"] = vl.rp.String()
m["ttl"] = vl.ttl.String()
m["writables"] = vl.writables
//m["locations"] = vl.vid2location
return m
}

View File

@@ -0,0 +1,65 @@
package topology
import (
"fmt"
)
type VolumeLocationList struct {
list []*DataNode
}
func NewVolumeLocationList() *VolumeLocationList {
return &VolumeLocationList{}
}
func (dnll *VolumeLocationList) String() string {
return fmt.Sprintf("%v", dnll.list)
}
func (dnll *VolumeLocationList) Head() *DataNode {
//mark first node as master volume
return dnll.list[0]
}
func (dnll *VolumeLocationList) Length() int {
return len(dnll.list)
}
func (dnll *VolumeLocationList) Set(loc *DataNode) {
for i := 0; i < len(dnll.list); i++ {
if loc.Ip == dnll.list[i].Ip && loc.Port == dnll.list[i].Port {
dnll.list[i] = loc
return
}
}
dnll.list = append(dnll.list, loc)
}
func (dnll *VolumeLocationList) Remove(loc *DataNode) bool {
for i, dnl := range dnll.list {
if loc.Ip == dnl.Ip && loc.Port == dnl.Port {
dnll.list = append(dnll.list[:i], dnll.list[i+1:]...)
return true
}
}
return false
}
func (dnll *VolumeLocationList) Refresh(freshThreshHold int64) {
var changed bool
for _, dnl := range dnll.list {
if dnl.LastSeen < freshThreshHold {
changed = true
break
}
}
if changed {
var l []*DataNode
for _, dnl := range dnll.list {
if dnl.LastSeen >= freshThreshHold {
l = append(l, dnl)
}
}
dnll.list = l
}
}