replication related work

This commit is contained in:
Chris Lu
2012-09-12 01:07:23 -07:00
parent e4c0693b03
commit 4a7833f1bf
6 changed files with 81 additions and 23 deletions

View File

@@ -58,22 +58,26 @@ If any "assign" request comes in
2. if not found, grow the volumes with the right replication level 2. if not found, grow the volumes with the right replication level
3. return a writable volume to the user 3. return a writable volume to the user
Plan:
Step 1. implement one copy(no replication), automatically assign volume ids
Step 2. add replication
For the above operations, here are the todo list: For the above operations, here are the todo list:
for data node: for data node:
1. onStartUp, and periodically, send existing volumes and maxVolumeCount store.Join(), DONE 1. onStartUp, and periodically, send existing volumes and maxVolumeCount store.Join(), DONE
2. accept command to grow a volume( id + replication level) DONE 2. accept command to grow a volume( id + replication level) DONE
/admin/assign_volume?volume=some_id&replicationType=01 /admin/assign_volume?volume=some_id&replicationType=01
3. accept status for a volumeLocationList if replication > 1 DONE 3. accept setting volumeLocationList DONE
/admin/set_volume_locations?volumeLocations=[{Vid:xxx,Locations:[loc1,loc2,loc3]}] /admin/set_volume_locations_list?volumeLocationsList=[{Vid:xxx,Locations:[loc1,loc2,loc3]}]
4. for each write, pass the write to the next location 4. for each write, pass the write to the next location, (Step 2)
POST method should accept an index, like ttl, get decremented every hop POST method should accept an index, like ttl, get decremented every hop
for master: for master:
1. accept data node's report of existing volumes and maxVolumeCount 1. accept data node's report of existing volumes and maxVolumeCount ALREADY EXISTS /dir/join
2. periodically refresh for active data nodes, and adjust writable volumes 2. periodically refresh for active data nodes, and adjust writable volumes
3. send command to grow a volume(id + replication level) 3. send command to grow a volume(id + replication level) DONE
4. NOT_IMPLEMENTING: if dead/stale data nodes are found, for the affected volumes, send stale info 4. NOT_IMPLEMENTING: if dead/stale data nodes are found, for the affected volumes, send stale info
to other data nodes. BECAUSE the master will stop sending writes to these data nodes to other data nodes. BECAUSE the master will stop sending writes to these data nodes
5. accept lookup for volume locations ALREADY EXISTS /dir/lookup

View File

@@ -40,6 +40,9 @@ func statusHandler(w http.ResponseWriter, r *http.Request) {
writeJson(w, r, store.Status()) writeJson(w, r, store.Status())
} }
func assignVolumeHandler(w http.ResponseWriter, r *http.Request) { func assignVolumeHandler(w http.ResponseWriter, r *http.Request) {
if *IsDebug {
log.Println("volume =", r.FormValue("volume"), ", replicationType =", r.FormValue("replicationType"))
}
err := store.AddVolume(r.FormValue("volume"), r.FormValue("replicationType")) err := store.AddVolume(r.FormValue("volume"), r.FormValue("replicationType"))
if err == nil { if err == nil {
writeJson(w, r, map[string]string{"error": ""}) writeJson(w, r, map[string]string{"error": ""})
@@ -48,9 +51,14 @@ func assignVolumeHandler(w http.ResponseWriter, r *http.Request) {
} }
} }
func setVolumeLocationsHandler(w http.ResponseWriter, r *http.Request) { func setVolumeLocationsHandler(w http.ResponseWriter, r *http.Request) {
if *IsDebug {
log.Println("volumeLocationsList =", r.FormValue("volumeLocationsList"))
}
volumeLocationsList := new([]storage.VolumeLocations) volumeLocationsList := new([]storage.VolumeLocations)
json.Unmarshal([]byte(r.FormValue("volumeLocations")), volumeLocationsList) err := json.Unmarshal([]byte(r.FormValue("volumeLocationsList")), volumeLocationsList)
err := store.SetVolumeLocations(*volumeLocationsList) if err == nil {
err = store.SetVolumeLocations(*volumeLocationsList)
}
if err == nil { if err == nil {
writeJson(w, r, map[string]string{"error": ""}) writeJson(w, r, map[string]string{"error": ""})
} else { } else {
@@ -173,7 +181,7 @@ func runVolume(cmd *Command, args []string) bool {
http.HandleFunc("/", storeHandler) http.HandleFunc("/", storeHandler)
http.HandleFunc("/status", statusHandler) http.HandleFunc("/status", statusHandler)
http.HandleFunc("/admin/assign_volume", assignVolumeHandler) http.HandleFunc("/admin/assign_volume", assignVolumeHandler)
http.HandleFunc("/admin/set_volume_locations", setVolumeLocationsHandler) http.HandleFunc("/admin/set_volume_locations_list", setVolumeLocationsHandler)
go func() { go func() {
for { for {

View File

@@ -0,0 +1,31 @@
package admin
import (
"encoding/json"
"errors"
"strconv"
"net/url"
"pkg/util"
"pkg/storage"
"pkg/topology"
)
type AllocateVolumeResult struct {
error string
}
func AllocateVolume(dn *topology.DataNode, vid storage.VolumeId, repType storage.ReplicationType) error{
values := make(url.Values)
values.Add("volume", vid.String())
values.Add("replicationType", repType.String())
jsonBlob := util.Post("http://"+dn.Ip+":"+strconv.Itoa(dn.Port)+"/admin/assign_volume", values)
var ret AllocateVolumeResult
err := json.Unmarshal(jsonBlob, &ret)
if err != nil {
return err
}
if ret.error != "" {
return errors.New(ret.error)
}
return nil
}

View File

@@ -33,6 +33,21 @@ func NewReplicationType(t string) ReplicationType {
} }
return Copy00 return Copy00
} }
func (r *ReplicationType) String() string {
switch *r {
case Copy00:
return "00"
case Copy01:
return "01"
case Copy10:
return "10"
case Copy11:
return "11"
case Copy20:
return "20"
}
return "00"
}
func GetReplicationLevelIndex(v *VolumeInfo) int { func GetReplicationLevelIndex(v *VolumeInfo) int {
switch v.RepType { switch v.RepType {

View File

@@ -8,9 +8,9 @@ import (
type DataNode struct { type DataNode struct {
NodeImpl NodeImpl
volumes map[storage.VolumeId]*storage.VolumeInfo volumes map[storage.VolumeId]*storage.VolumeInfo
ip string Ip string
port int Port int
publicUrl string PublicUrl string
lastSeen int64 // unix time in seconds lastSeen int64 // unix time in seconds
} }

View File

@@ -12,7 +12,7 @@ func NewDataNodeLocationList() *DataNodeLocationList {
func (dnll *DataNodeLocationList) Add(loc *DataNode) { func (dnll *DataNodeLocationList) Add(loc *DataNode) {
for _, dnl := range dnll.list { for _, dnl := range dnll.list {
if loc.ip == dnl.ip && loc.port == dnl.port { if loc.Ip == dnl.Ip && loc.Port == dnl.Port {
break break
} }
} }