now any data node can accept storing files
files are automatically copied to replicas
This commit is contained in:
@@ -71,7 +71,7 @@ func upload(filename string, server string, fid string) (int) {
|
|||||||
}
|
}
|
||||||
panic(err.Error())
|
panic(err.Error())
|
||||||
}
|
}
|
||||||
ret, _ := operation.Upload(server, fid, filename, fh)
|
ret, _ := operation.Upload("http://"+server+"/"+fid, filename, fh)
|
||||||
return ret.Size
|
return ret.Size
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"log"
|
"log"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
@@ -127,27 +128,29 @@ func GetHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
w.Write(n.Data)
|
w.Write(n.Data)
|
||||||
}
|
}
|
||||||
func PostHandler(w http.ResponseWriter, r *http.Request) {
|
func PostHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
r.ParseForm()
|
||||||
vid, _, _ := parseURLPath(r.URL.Path)
|
vid, _, _ := parseURLPath(r.URL.Path)
|
||||||
volumeId, e := storage.NewVolumeId(vid)
|
volumeId, e := storage.NewVolumeId(vid)
|
||||||
if e != nil {
|
if e != nil {
|
||||||
writeJson(w, r, e)
|
writeJson(w, r, e)
|
||||||
} else {
|
} else {
|
||||||
needle, ne := storage.NewNeedle(r)
|
needle, filename, ne := storage.NewNeedle(r)
|
||||||
if ne != nil {
|
if ne != nil {
|
||||||
writeJson(w, r, ne)
|
writeJson(w, r, ne)
|
||||||
} else {
|
} else {
|
||||||
ret := store.Write(volumeId, needle)
|
ret := store.Write(volumeId, needle)
|
||||||
if ret > 0 { //send to other replica locations
|
if ret > 0 || !store.HasVolume(volumeId){ //send to other replica locations
|
||||||
if r.FormValue("type") != "standard" {
|
if r.FormValue("type") != "standard" {
|
||||||
waitTime, err := strconv.Atoi(r.FormValue("wait"))
|
waitTime, err := strconv.Atoi(r.FormValue("wait"))
|
||||||
lookupResult, lookupErr := operation.Lookup(*server, volumeId)
|
lookupResult, lookupErr := operation.Lookup(*server, volumeId)
|
||||||
if lookupErr == nil {
|
if lookupErr == nil {
|
||||||
sendFunc := func(background bool) {
|
sendFunc := func(background bool) {
|
||||||
postContentFunc := func(location operation.Location) bool {
|
postContentFunc := func(location operation.Location) bool {
|
||||||
|
operation.Upload("http://"+location.PublicUrl+r.URL.Path+"?type=standard", filename, bytes.NewReader(needle.Data))
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
for _, location := range lookupResult.Locations {
|
for _, location := range lookupResult.Locations {
|
||||||
|
if location.PublicUrl != *publicUrl {
|
||||||
if background {
|
if background {
|
||||||
go postContentFunc(location)
|
go postContentFunc(location)
|
||||||
} else {
|
} else {
|
||||||
@@ -155,6 +158,7 @@ func PostHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
sendFunc(err == nil && waitTime > 0)
|
sendFunc(err == nil && waitTime > 0)
|
||||||
} else {
|
} else {
|
||||||
log.Println("Failed to lookup for", volumeId, lookupErr.Error())
|
log.Println("Failed to lookup for", volumeId, lookupErr.Error())
|
||||||
|
|||||||
@@ -14,14 +14,14 @@ type UploadResult struct {
|
|||||||
Size int
|
Size int
|
||||||
}
|
}
|
||||||
|
|
||||||
func Upload(server string, fid string, filename string, reader io.Reader) (*UploadResult, error) {
|
func Upload(uploadUrl string, filename string, reader io.Reader) (*UploadResult, error) {
|
||||||
body_buf := bytes.NewBufferString("")
|
body_buf := bytes.NewBufferString("")
|
||||||
body_writer := multipart.NewWriter(body_buf)
|
body_writer := multipart.NewWriter(body_buf)
|
||||||
file_writer, err := body_writer.CreateFormFile("file", filename)
|
file_writer, err := body_writer.CreateFormFile("file", filename)
|
||||||
io.Copy(file_writer, reader)
|
io.Copy(file_writer, reader)
|
||||||
content_type := body_writer.FormDataContentType()
|
content_type := body_writer.FormDataContentType()
|
||||||
body_writer.Close()
|
body_writer.Close()
|
||||||
resp, err := http.Post("http://"+server+"/"+fid, content_type, body_buf)
|
resp, err := http.Post(uploadUrl, content_type, body_buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package storage
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"mime"
|
"mime"
|
||||||
@@ -21,17 +22,17 @@ type Needle struct {
|
|||||||
Padding []byte "Aligned to 8 bytes"
|
Padding []byte "Aligned to 8 bytes"
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewNeedle(r *http.Request) (n *Needle, e error) {
|
func NewNeedle(r *http.Request) (n *Needle, fname string, e error) {
|
||||||
|
|
||||||
n = new(Needle)
|
n = new(Needle)
|
||||||
form, fe := r.MultipartReader()
|
form, fe := r.MultipartReader()
|
||||||
if fe != nil {
|
if fe != nil {
|
||||||
println("MultipartReader [ERROR]", fe)
|
fmt.Println("MultipartReader [ERROR]", fe)
|
||||||
e = fe
|
e = fe
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
part, _ := form.NextPart()
|
part, _ := form.NextPart()
|
||||||
fname := part.FileName()
|
fname = part.FileName()
|
||||||
data, _ := ioutil.ReadAll(part)
|
data, _ := ioutil.ReadAll(part)
|
||||||
//log.Println("uploading file " + part.FileName())
|
//log.Println("uploading file " + part.FileName())
|
||||||
dotIndex := strings.LastIndex(fname, ".")
|
dotIndex := strings.LastIndex(fname, ".")
|
||||||
|
|||||||
Reference in New Issue
Block a user