can auto grow 00 and 01 replication volumes

This commit is contained in:
Chris Lu
2012-09-16 17:31:15 -07:00
parent e7c4ee1c64
commit 9b99240584
18 changed files with 364 additions and 188 deletions

View File

@@ -5,7 +5,9 @@ import (
"log"
"net/http"
"pkg/directory"
"pkg/replication"
"pkg/storage"
"pkg/topology"
"strconv"
"strings"
"time"
@@ -29,11 +31,14 @@ var (
mport = cmdMaster.Flag.Int("port", 9333, "http listen port")
metaFolder = cmdMaster.Flag.String("mdir", "/tmp", "data directory to store mappings")
capacity = cmdMaster.Flag.Int("capacity", 100, "maximum number of volumes to hold")
mapper *directory.Mapper
volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1024, "Default Volume Size in MegaBytes")
mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
)
var mapper *directory.Mapper
var topo *topology.Topology
var vg *replication.VolumeGrowth
func dirLookupHandler(w http.ResponseWriter, r *http.Request) {
vid := r.FormValue("volumeId")
commaSep := strings.Index(vid, ",")
@@ -43,10 +48,10 @@ func dirLookupHandler(w http.ResponseWriter, r *http.Request) {
volumeId, _ := storage.NewVolumeId(vid)
machines, e := mapper.Get(volumeId)
if e == nil {
ret:= []map[string]string{}
for _, machine := range machines {
ret = append(ret,map[string]string{"url": machine.Url, "publicUrl": machine.PublicUrl})
}
ret := []map[string]string{}
for _, machine := range machines {
ret = append(ret, map[string]string{"url": machine.Url, "publicUrl": machine.PublicUrl})
}
writeJson(w, r, ret)
} else {
log.Println("Invalid volume id", volumeId)
@@ -62,7 +67,27 @@ func dirAssignHandler(w http.ResponseWriter, r *http.Request) {
writeJson(w, r, map[string]string{"error": err.Error()})
}
}
func dirAssign2Handler(w http.ResponseWriter, r *http.Request) {
c, _ := strconv.Atoi(r.FormValue("count"))
rt := storage.NewReplicationType(r.FormValue("replication"))
if topo.GetVolumeLayout(rt).GetActiveVolumeCount() <= 0 {
if topo.FreeSpace() <= 0 {
writeJson(w, r, map[string]string{"error": "No free volumes left!"})
} else {
vg.GrowByType(rt, topo)
}
}
fid, count, dn, err := topo.PickForWrite(rt, c)
if err == nil {
writeJson(w, r, map[string]interface{}{"fid": fid, "url": dn.Ip + ":" + strconv.Itoa(dn.Port), "publicUrl": dn.PublicUrl, "count": count})
} else {
writeJson(w, r, map[string]string{"error": err.Error()})
}
}
func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
ip := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")]
port, _ := strconv.Atoi(r.FormValue("port"))
maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount"))
s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port")
publicUrl := r.FormValue("publicUrl")
volumes := new([]storage.VolumeInfo)
@@ -71,20 +96,40 @@ func dirJoinHandler(w http.ResponseWriter, r *http.Request) {
log.Println(s, "volumes", r.FormValue("volumes"))
}
mapper.Add(directory.NewMachine(s, publicUrl, *volumes, time.Now().Unix()))
//new ways
topo.RegisterVolumes(*volumes, ip, port, publicUrl, maxVolumeCount)
}
func dirStatusHandler(w http.ResponseWriter, r *http.Request) {
func dirOldStatusHandler(w http.ResponseWriter, r *http.Request) {
writeJson(w, r, mapper)
}
func dirNewStatusHandler(w http.ResponseWriter, r *http.Request) {
writeJson(w, r, topo.ToMap())
}
func volumeGrowHandler(w http.ResponseWriter, r *http.Request) {
rt := storage.NewReplicationType(r.FormValue("replication"))
count, err := strconv.Atoi(r.FormValue("count"))
if err != nil {
vg.GrowByType(rt, topo)
} else {
vg.GrowByCountAndType(count, rt, topo)
}
}
func runMaster(cmd *Command, args []string) bool {
topo = topology.NewTopology("topo", *metaFolder, "toposequence", uint64(*volumeSizeLimitMB)*1024*1024, *mpulse)
vg = replication.NewDefaultVolumeGrowth()
log.Println("Volume Size Limit is", *volumeSizeLimitMB, "MB")
mapper = directory.NewMapper(*metaFolder, "directory", uint64(*volumeSizeLimitMB)*1024*1024, *mpulse)
http.HandleFunc("/dir/assign", dirAssignHandler)
http.HandleFunc("/dir/assign2", dirAssign2Handler)
http.HandleFunc("/dir/lookup", dirLookupHandler)
http.HandleFunc("/dir/join", dirJoinHandler)
http.HandleFunc("/dir/status", dirStatusHandler)
http.HandleFunc("/dir/status", dirOldStatusHandler)
http.HandleFunc("/dir/status2", dirNewStatusHandler) //temporary
http.HandleFunc("/vol/grow", volumeGrowHandler)
mapper.StartRefreshWritableVolumes()
mapper.StartRefreshWritableVolumes()
log.Println("Start directory service at http://127.0.0.1:" + strconv.Itoa(*mport))
e := http.ListenAndServe(":"+strconv.Itoa(*mport), nil)

View File

@@ -39,19 +39,22 @@ type AssignResult struct {
Error string "error"
}
func assign(count int) (AssignResult, error) {
func assign(count int) (*AssignResult, error) {
values := make(url.Values)
values.Add("count", strconv.Itoa(count))
jsonBlob := util.Post("http://"+*server+"/dir/assign", values)
var ret AssignResult
err := json.Unmarshal(jsonBlob, &ret)
jsonBlob, err := util.Post("http://"+*server+"/dir/assign", values)
if err != nil {
return ret, err
return nil, err
}
var ret AssignResult
err = json.Unmarshal(jsonBlob, &ret)
if err != nil {
return nil, err
}
if ret.Count <= 0 {
return ret, errors.New(ret.Error)
return nil, errors.New(ret.Error)
}
return ret, nil
return &ret, nil
}
type UploadResult struct {

View File

@@ -6,6 +6,7 @@ import (
"math/rand"
"mime"
"net/http"
"os"
"pkg/storage"
"strconv"
"strings"
@@ -18,7 +19,7 @@ func init() {
}
var cmdVolume = &Command{
UsageLine: "volume -port=8080 -dir=/tmp -volumes=0-99 -publicUrl=server_name:8080 -mserver=localhost:9333",
UsageLine: "volume -port=8080 -dir=/tmp -min=3 -max=5 -publicUrl=server_name:8080 -mserver=localhost:9333",
Short: "start a volume server",
Long: `start a volume server to provide storage spaces
@@ -26,12 +27,13 @@ var cmdVolume = &Command{
}
var (
vport = cmdVolume.Flag.Int("port", 8080, "http listen port")
chunkFolder = cmdVolume.Flag.String("dir", "/tmp", "data directory to store files")
volumes = cmdVolume.Flag.String("volumes", "0,1-3,4", "comma-separated list of volume ids or range of ids")
publicUrl = cmdVolume.Flag.String("publicUrl", "localhost:8080", "public url to serve data read")
masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master directory server to store mappings")
vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
vport = cmdVolume.Flag.Int("port", 8080, "http listen port")
volumeFolder = cmdVolume.Flag.String("dir", "/tmp", "data directory to store files")
volumes = cmdVolume.Flag.String("volumes", "", "comma-separated list, or ranges of volume ids")
publicUrl = cmdVolume.Flag.String("publicUrl", "localhost:8080", "public url to serve data read")
masterNode = cmdVolume.Flag.String("mserver", "localhost:9333", "master directory server to store mappings")
vpulse = cmdVolume.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
maxVolumeCount = cmdVolume.Flag.Int("maxVolumeCount", 5, "maximum number of volumes")
store *storage.Store
)
@@ -46,9 +48,9 @@ func assignVolumeHandler(w http.ResponseWriter, r *http.Request) {
} else {
writeJson(w, r, map[string]string{"error": err.Error()})
}
if *IsDebug {
log.Println("volume =", r.FormValue("volume"), ", replicationType =", r.FormValue("replicationType"), ", error =", err)
}
if *IsDebug {
log.Println("volume =", r.FormValue("volume"), ", replicationType =", r.FormValue("replicationType"), ", error =", err)
}
}
func setVolumeLocationsHandler(w http.ResponseWriter, r *http.Request) {
if *IsDebug {
@@ -86,11 +88,15 @@ func GetHandler(w http.ResponseWriter, r *http.Request) {
}
cookie := n.Cookie
count, e := store.Read(volumeId, n)
if e != nil {
w.WriteHeader(404)
}
if *IsDebug {
log.Println("read bytes", count, "error", e)
}
if n.Cookie != cookie {
log.Println("request with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent())
w.WriteHeader(404)
return
}
if ext != "" {
@@ -175,14 +181,25 @@ func parseURLPath(path string) (vid, fid, ext string) {
}
func runVolume(cmd *Command, args []string) bool {
fileInfo, err := os.Stat(*volumeFolder)
//TODO: now default to 1G, this value should come from server?
store = storage.NewStore(*vport, *publicUrl, *chunkFolder, *volumes)
if err!=nil{
log.Fatalf("No Existing Folder:%s", *volumeFolder)
}
if !fileInfo.IsDir() {
log.Fatalf("Volume Folder should not be a file:%s", *volumeFolder)
}
perm:=fileInfo.Mode().Perm()
log.Println("Volume Folder permission:", perm)
store = storage.NewStore(*vport, *publicUrl, *volumeFolder, *maxVolumeCount, *volumes)
defer store.Close()
http.HandleFunc("/", storeHandler)
http.HandleFunc("/status", statusHandler)
http.HandleFunc("/admin/assign_volume", assignVolumeHandler)
http.HandleFunc("/admin/set_volume_locations_list", setVolumeLocationsHandler)
go func() {
for {
store.Join(*masterNode)