merge conflicts
This commit is contained in:
@@ -2,5 +2,6 @@ package operation
|
|||||||
|
|
||||||
type JoinResult struct {
|
type JoinResult struct {
|
||||||
VolumeSizeLimit uint64 `json:"VolumeSizeLimit,omitempty"`
|
VolumeSizeLimit uint64 `json:"VolumeSizeLimit,omitempty"`
|
||||||
|
SecretKey string `json:"secretKey,omitempty"`
|
||||||
Error string `json:"error,omitempty"`
|
Error string `json:"error,omitempty"`
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/chrislusf/weed-fs/go/security"
|
||||||
"github.com/chrislusf/weed-fs/go/util"
|
"github.com/chrislusf/weed-fs/go/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -16,12 +17,12 @@ type DeleteResult struct {
|
|||||||
Error string `json:"error,omitempty"`
|
Error string `json:"error,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func DeleteFile(master string, fileId string) error {
|
func DeleteFile(master string, fileId string, jwt security.EncodedJwt) error {
|
||||||
fileUrl, err := LookupFileId(master, fileId)
|
fileUrl, err := LookupFileId(master, fileId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return util.Delete(fileUrl)
|
return util.Delete(fileUrl, jwt)
|
||||||
}
|
}
|
||||||
|
|
||||||
func ParseFileId(fid string) (vid string, key_cookie string, err error) {
|
func ParseFileId(fid string) (vid string, key_cookie string, err error) {
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/chrislusf/weed-fs/go/glog"
|
"github.com/chrislusf/weed-fs/go/glog"
|
||||||
|
"github.com/chrislusf/weed-fs/go/security"
|
||||||
)
|
)
|
||||||
|
|
||||||
type FilePart struct {
|
type FilePart struct {
|
||||||
@@ -34,7 +35,10 @@ type SubmitResult struct {
|
|||||||
Error string `json:"error,omitempty"`
|
Error string `json:"error,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func SubmitFiles(master string, files []FilePart, replication string, collection string, ttl string, maxMB int) ([]SubmitResult, error) {
|
func SubmitFiles(master string, files []FilePart,
|
||||||
|
replication string, collection string, ttl string, maxMB int,
|
||||||
|
secret security.Secret,
|
||||||
|
) ([]SubmitResult, error) {
|
||||||
results := make([]SubmitResult, len(files))
|
results := make([]SubmitResult, len(files))
|
||||||
for index, file := range files {
|
for index, file := range files {
|
||||||
results[index].FileName = file.FileName
|
results[index].FileName = file.FileName
|
||||||
@@ -54,7 +58,7 @@ func SubmitFiles(master string, files []FilePart, replication string, collection
|
|||||||
file.Server = ret.PublicUrl
|
file.Server = ret.PublicUrl
|
||||||
file.Replication = replication
|
file.Replication = replication
|
||||||
file.Collection = collection
|
file.Collection = collection
|
||||||
results[index].Size, err = file.Upload(maxMB, master)
|
results[index].Size, err = file.Upload(maxMB, master, secret)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
results[index].Error = err.Error()
|
results[index].Error = err.Error()
|
||||||
}
|
}
|
||||||
@@ -101,7 +105,8 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) {
|
|||||||
return ret, nil
|
return ret, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fi FilePart) Upload(maxMB int, master string) (retSize uint32, err error) {
|
func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (retSize uint32, err error) {
|
||||||
|
jwt := security.GenJwt(secret, fi.Fid)
|
||||||
fileUrl := "http://" + fi.Server + "/" + fi.Fid
|
fileUrl := "http://" + fi.Server + "/" + fi.Fid
|
||||||
if fi.ModTime != 0 {
|
if fi.ModTime != 0 {
|
||||||
fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
|
fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
|
||||||
@@ -114,16 +119,20 @@ func (fi FilePart) Upload(maxMB int, master string) (retSize uint32, err error)
|
|||||||
chunks := fi.FileSize/chunkSize + 1
|
chunks := fi.FileSize/chunkSize + 1
|
||||||
fids := make([]string, 0)
|
fids := make([]string, 0)
|
||||||
for i := int64(0); i < chunks; i++ {
|
for i := int64(0); i < chunks; i++ {
|
||||||
id, count, e := upload_one_chunk(fi.FileName+"-"+strconv.FormatInt(i+1, 10), io.LimitReader(fi.Reader, chunkSize), master, fi.Replication, fi.Collection, fi.Ttl)
|
id, count, e := upload_one_chunk(
|
||||||
|
fi.FileName+"-"+strconv.FormatInt(i+1, 10),
|
||||||
|
io.LimitReader(fi.Reader, chunkSize),
|
||||||
|
master, fi.Replication, fi.Collection, fi.Ttl,
|
||||||
|
jwt)
|
||||||
if e != nil {
|
if e != nil {
|
||||||
return 0, e
|
return 0, e
|
||||||
}
|
}
|
||||||
fids = append(fids, id)
|
fids = append(fids, id)
|
||||||
retSize += count
|
retSize += count
|
||||||
}
|
}
|
||||||
err = upload_file_id_list(fileUrl, fi.FileName+"-list", fids)
|
err = upload_file_id_list(fileUrl, fi.FileName+"-list", fids, jwt)
|
||||||
} else {
|
} else {
|
||||||
ret, e := Upload(fileUrl, fi.FileName, fi.Reader, fi.IsGzipped, fi.MimeType)
|
ret, e := Upload(fileUrl, fi.FileName, fi.Reader, fi.IsGzipped, fi.MimeType, jwt)
|
||||||
if e != nil {
|
if e != nil {
|
||||||
return 0, e
|
return 0, e
|
||||||
}
|
}
|
||||||
@@ -132,24 +141,27 @@ func (fi FilePart) Upload(maxMB int, master string) (retSize uint32, err error)
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func upload_one_chunk(filename string, reader io.Reader, master, replication string, collection string, ttl string) (fid string, size uint32, e error) {
|
func upload_one_chunk(filename string, reader io.Reader, master,
|
||||||
|
replication string, collection string, ttl string, jwt security.EncodedJwt,
|
||||||
|
) (fid string, size uint32, e error) {
|
||||||
ret, err := Assign(master, 1, replication, collection, ttl)
|
ret, err := Assign(master, 1, replication, collection, ttl)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", 0, err
|
return "", 0, err
|
||||||
}
|
}
|
||||||
fileUrl, fid := "http://"+ret.Url+"/"+ret.Fid, ret.Fid
|
fileUrl, fid := "http://"+ret.Url+"/"+ret.Fid, ret.Fid
|
||||||
glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
|
glog.V(4).Info("Uploading part ", filename, " to ", fileUrl, "...")
|
||||||
uploadResult, uploadError := Upload(fileUrl, filename, reader, false, "application/octet-stream")
|
uploadResult, uploadError := Upload(fileUrl, filename, reader, false,
|
||||||
|
"application/octet-stream", jwt)
|
||||||
if uploadError != nil {
|
if uploadError != nil {
|
||||||
return fid, 0, uploadError
|
return fid, 0, uploadError
|
||||||
}
|
}
|
||||||
return fid, uploadResult.Size, nil
|
return fid, uploadResult.Size, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func upload_file_id_list(fileUrl, filename string, fids []string) error {
|
func upload_file_id_list(fileUrl, filename string, fids []string, jwt security.EncodedJwt) error {
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
buf.WriteString(strings.Join(fids, "\n"))
|
buf.WriteString(strings.Join(fids, "\n"))
|
||||||
glog.V(4).Info("Uploading final list ", filename, " to ", fileUrl, "...")
|
glog.V(4).Info("Uploading final list ", filename, " to ", fileUrl, "...")
|
||||||
_, e := Upload(fileUrl, filename, &buf, false, "text/plain")
|
_, e := Upload(fileUrl, filename, &buf, false, "text/plain", jwt)
|
||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/chrislusf/weed-fs/go/glog"
|
"github.com/chrislusf/weed-fs/go/glog"
|
||||||
|
"github.com/chrislusf/weed-fs/go/security"
|
||||||
)
|
)
|
||||||
|
|
||||||
type UploadResult struct {
|
type UploadResult struct {
|
||||||
@@ -35,13 +36,13 @@ func init() {
|
|||||||
|
|
||||||
var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
|
var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
|
||||||
|
|
||||||
func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string) (*UploadResult, error) {
|
func Upload(uploadUrl string, filename string, reader io.Reader, isGzipped bool, mtype string, jwt security.EncodedJwt) (*UploadResult, error) {
|
||||||
return upload_content(uploadUrl, func(w io.Writer) (err error) {
|
return upload_content(uploadUrl, func(w io.Writer) (err error) {
|
||||||
_, err = io.Copy(w, reader)
|
_, err = io.Copy(w, reader)
|
||||||
return
|
return
|
||||||
}, filename, isGzipped, mtype)
|
}, filename, isGzipped, mtype, jwt)
|
||||||
}
|
}
|
||||||
func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string) (*UploadResult, error) {
|
func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error, filename string, isGzipped bool, mtype string, jwt security.EncodedJwt) (*UploadResult, error) {
|
||||||
body_buf := bytes.NewBufferString("")
|
body_buf := bytes.NewBufferString("")
|
||||||
body_writer := multipart.NewWriter(body_buf)
|
body_writer := multipart.NewWriter(body_buf)
|
||||||
h := make(textproto.MIMEHeader)
|
h := make(textproto.MIMEHeader)
|
||||||
@@ -55,6 +56,9 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
|
|||||||
if isGzipped {
|
if isGzipped {
|
||||||
h.Set("Content-Encoding", "gzip")
|
h.Set("Content-Encoding", "gzip")
|
||||||
}
|
}
|
||||||
|
if jwt != "" {
|
||||||
|
h.Set("Authorization", "BEARER "+string(jwt))
|
||||||
|
}
|
||||||
file_writer, cp_err := body_writer.CreatePart(h)
|
file_writer, cp_err := body_writer.CreatePart(h)
|
||||||
if cp_err != nil {
|
if cp_err != nil {
|
||||||
glog.V(0).Infoln("error creating form file", cp_err.Error())
|
glog.V(0).Infoln("error creating form file", cp_err.Error())
|
||||||
|
|||||||
@@ -5,11 +5,8 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/chrislusf/weed-fs/go/glog"
|
"github.com/chrislusf/weed-fs/go/glog"
|
||||||
"github.com/dgrijalva/jwt-go"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@@ -44,24 +41,38 @@ https://github.com/pkieltyka/jwtauth/blob/master/jwtauth.go
|
|||||||
*/
|
*/
|
||||||
type Guard struct {
|
type Guard struct {
|
||||||
whiteList []string
|
whiteList []string
|
||||||
secretKey string
|
SecretKey Secret
|
||||||
|
|
||||||
isActive bool
|
isActive bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewGuard(whiteList []string, secretKey string) *Guard {
|
func NewGuard(whiteList []string, secretKey string) *Guard {
|
||||||
g := &Guard{whiteList: whiteList, secretKey: secretKey}
|
g := &Guard{whiteList: whiteList, SecretKey: Secret(secretKey)}
|
||||||
g.isActive = len(g.whiteList) != 0 || len(g.secretKey) != 0
|
g.isActive = len(g.whiteList) != 0 || len(g.SecretKey) != 0
|
||||||
return g
|
return g
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g *Guard) WhiteList(f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if !g.isActive {
|
||||||
|
//if no security needed, just skip all checkings
|
||||||
|
return f
|
||||||
|
}
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if err := g.checkWhiteList(w, r); err != nil {
|
||||||
|
w.WriteHeader(http.StatusUnauthorized)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
f(w, r)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (g *Guard) Secure(f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) {
|
func (g *Guard) Secure(f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) {
|
||||||
if !g.isActive {
|
if !g.isActive {
|
||||||
//if no security needed, just skip all checkings
|
//if no security needed, just skip all checkings
|
||||||
return f
|
return f
|
||||||
}
|
}
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
if err := g.doCheck(w, r); err != nil {
|
if err := g.checkJwt(w, r); err != nil {
|
||||||
w.WriteHeader(http.StatusUnauthorized)
|
w.WriteHeader(http.StatusUnauthorized)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -69,76 +80,48 @@ func (g *Guard) Secure(f func(w http.ResponseWriter, r *http.Request)) func(w ht
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *Guard) NewToken() (tokenString string, err error) {
|
func (g *Guard) checkWhiteList(w http.ResponseWriter, r *http.Request) error {
|
||||||
m := make(map[string]interface{})
|
if len(g.whiteList) == 0 {
|
||||||
m["exp"] = time.Now().Unix() + 10
|
return nil
|
||||||
return g.Encode(m)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *Guard) Encode(claims map[string]interface{}) (tokenString string, err error) {
|
|
||||||
if !g.isActive {
|
|
||||||
return "", nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
t := jwt.New(jwt.GetSigningMethod("HS256"))
|
host, _, err := net.SplitHostPort(r.RemoteAddr)
|
||||||
t.Claims = claims
|
if err == nil {
|
||||||
return t.SignedString(g.secretKey)
|
for _, ip := range g.whiteList {
|
||||||
}
|
if ip == host {
|
||||||
|
return nil
|
||||||
func (g *Guard) Decode(tokenString string) (token *jwt.Token, err error) {
|
|
||||||
return jwt.Parse(tokenString, func(token *jwt.Token) (interface{}, error) {
|
|
||||||
return g.secretKey, nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *Guard) doCheck(w http.ResponseWriter, r *http.Request) error {
|
|
||||||
if len(g.whiteList) != 0 {
|
|
||||||
host, _, err := net.SplitHostPort(r.RemoteAddr)
|
|
||||||
if err == nil {
|
|
||||||
for _, ip := range g.whiteList {
|
|
||||||
if ip == host {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(g.secretKey) != 0 {
|
glog.V(1).Infof("Not in whitelist: %s", r.RemoteAddr)
|
||||||
|
return fmt.Errorf("Not in whitelis: %s", r.RemoteAddr)
|
||||||
|
}
|
||||||
|
|
||||||
// Get token from query params
|
func (g *Guard) checkJwt(w http.ResponseWriter, r *http.Request) error {
|
||||||
tokenStr := r.URL.Query().Get("jwt")
|
if g.checkWhiteList(w, r) == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Get token from authorization header
|
if len(g.SecretKey) == 0 {
|
||||||
if tokenStr == "" {
|
return nil
|
||||||
bearer := r.Header.Get("Authorization")
|
}
|
||||||
if len(bearer) > 7 && strings.ToUpper(bearer[0:6]) == "BEARER" {
|
|
||||||
tokenStr = bearer[7:]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get token from cookie
|
tokenStr := GetJwt(r)
|
||||||
if tokenStr == "" {
|
|
||||||
cookie, err := r.Cookie("jwt")
|
|
||||||
if err == nil {
|
|
||||||
tokenStr = cookie.Value
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if tokenStr == "" {
|
if tokenStr == "" {
|
||||||
return ErrUnauthorized
|
return ErrUnauthorized
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify the token
|
|
||||||
token, err := g.Decode(tokenStr)
|
|
||||||
if err != nil {
|
|
||||||
glog.V(1).Infof("Token verification error from %s: %v", r.RemoteAddr, err)
|
|
||||||
return ErrUnauthorized
|
|
||||||
}
|
|
||||||
if !token.Valid {
|
|
||||||
glog.V(1).Infof("Token invliad from %s: %v", r.RemoteAddr, tokenStr)
|
|
||||||
return ErrUnauthorized
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// Verify the token
|
||||||
|
token, err := DecodeJwt(g.SecretKey, tokenStr)
|
||||||
|
if err != nil {
|
||||||
|
glog.V(1).Infof("Token verification error from %s: %v", r.RemoteAddr, err)
|
||||||
|
return ErrUnauthorized
|
||||||
|
}
|
||||||
|
if !token.Valid {
|
||||||
|
glog.V(1).Infof("Token invliad from %s: %v", r.RemoteAddr, tokenStr)
|
||||||
|
return ErrUnauthorized
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(1).Infof("No permission from %s", r.RemoteAddr)
|
glog.V(1).Infof("No permission from %s", r.RemoteAddr)
|
||||||
|
|||||||
72
go/security/jwt.go
Normal file
72
go/security/jwt.go
Normal file
@@ -0,0 +1,72 @@
|
|||||||
|
package security
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/chrislusf/weed-fs/go/glog"
|
||||||
|
jwt "github.com/dgrijalva/jwt-go"
|
||||||
|
)
|
||||||
|
|
||||||
|
type EncodedJwt string
|
||||||
|
type Secret string
|
||||||
|
|
||||||
|
func GenJwt(secret Secret, fileId string) EncodedJwt {
|
||||||
|
if secret == "" {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
t := jwt.New(jwt.GetSigningMethod("HS256"))
|
||||||
|
t.Claims["exp"] = time.Now().Unix() + 10
|
||||||
|
t.Claims["sub"] = fileId
|
||||||
|
encoded, e := t.SignedString(secret)
|
||||||
|
if e != nil {
|
||||||
|
glog.V(0).Infof("Failed to sign claims: %v", t.Claims)
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return EncodedJwt(encoded)
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetJwt(r *http.Request) EncodedJwt {
|
||||||
|
|
||||||
|
// Get token from query params
|
||||||
|
tokenStr := r.URL.Query().Get("jwt")
|
||||||
|
|
||||||
|
// Get token from authorization header
|
||||||
|
if tokenStr == "" {
|
||||||
|
bearer := r.Header.Get("Authorization")
|
||||||
|
if len(bearer) > 7 && strings.ToUpper(bearer[0:6]) == "BEARER" {
|
||||||
|
tokenStr = bearer[7:]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get token from cookie
|
||||||
|
if tokenStr == "" {
|
||||||
|
cookie, err := r.Cookie("jwt")
|
||||||
|
if err == nil {
|
||||||
|
tokenStr = cookie.Value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return EncodedJwt(tokenStr)
|
||||||
|
}
|
||||||
|
|
||||||
|
func EncodeJwt(secret Secret, claims map[string]interface{}) (EncodedJwt, error) {
|
||||||
|
if secret == "" {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
t := jwt.New(jwt.GetSigningMethod("HS256"))
|
||||||
|
t.Claims = claims
|
||||||
|
encoded, e := t.SignedString(secret)
|
||||||
|
return EncodedJwt(encoded), e
|
||||||
|
}
|
||||||
|
|
||||||
|
func DecodeJwt(secret Secret, tokenString EncodedJwt) (token *jwt.Token, err error) {
|
||||||
|
// check exp, nbf
|
||||||
|
return jwt.Parse(string(tokenString), func(token *jwt.Token) (interface{}, error) {
|
||||||
|
return secret, nil
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
|
|
||||||
"github.com/chrislusf/weed-fs/go/glog"
|
"github.com/chrislusf/weed-fs/go/glog"
|
||||||
"github.com/chrislusf/weed-fs/go/operation"
|
"github.com/chrislusf/weed-fs/go/operation"
|
||||||
|
"github.com/chrislusf/weed-fs/go/security"
|
||||||
"github.com/chrislusf/weed-fs/go/util"
|
"github.com/chrislusf/weed-fs/go/util"
|
||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
)
|
)
|
||||||
@@ -260,7 +261,7 @@ func (s *Store) SetRack(rack string) {
|
|||||||
func (s *Store) SetBootstrapMaster(bootstrapMaster string) {
|
func (s *Store) SetBootstrapMaster(bootstrapMaster string) {
|
||||||
s.masterNodes = NewMasterNodes(bootstrapMaster)
|
s.masterNodes = NewMasterNodes(bootstrapMaster)
|
||||||
}
|
}
|
||||||
func (s *Store) Join() (masterNode string, e error) {
|
func (s *Store) Join() (masterNode string, secretKey security.Secret, e error) {
|
||||||
masterNode, e = s.masterNodes.findMaster()
|
masterNode, e = s.masterNodes.findMaster()
|
||||||
if e != nil {
|
if e != nil {
|
||||||
return
|
return
|
||||||
@@ -314,22 +315,23 @@ func (s *Store) Join() (masterNode string, e error) {
|
|||||||
|
|
||||||
data, err := proto.Marshal(joinMessage)
|
data, err := proto.Marshal(joinMessage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
jsonBlob, err := util.PostBytes("http://"+masterNode+"/dir/join", data)
|
jsonBlob, err := util.PostBytes("http://"+masterNode+"/dir/join", data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.masterNodes.reset()
|
s.masterNodes.reset()
|
||||||
return "", err
|
return "", "", err
|
||||||
}
|
}
|
||||||
var ret operation.JoinResult
|
var ret operation.JoinResult
|
||||||
if err := json.Unmarshal(jsonBlob, &ret); err != nil {
|
if err := json.Unmarshal(jsonBlob, &ret); err != nil {
|
||||||
return masterNode, err
|
return masterNode, "", err
|
||||||
}
|
}
|
||||||
if ret.Error != "" {
|
if ret.Error != "" {
|
||||||
return masterNode, errors.New(ret.Error)
|
return masterNode, "", errors.New(ret.Error)
|
||||||
}
|
}
|
||||||
s.volumeSizeLimit = ret.VolumeSizeLimit
|
s.volumeSizeLimit = ret.VolumeSizeLimit
|
||||||
|
secretKey = security.Secret(ret.SecretKey)
|
||||||
s.connected = true
|
s.connected = true
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -353,7 +355,7 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
|
|||||||
}
|
}
|
||||||
if s.volumeSizeLimit < v.ContentSize()+3*uint64(size) {
|
if s.volumeSizeLimit < v.ContentSize()+3*uint64(size) {
|
||||||
glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.volumeSizeLimit)
|
glog.V(0).Infoln("volume", i, "size", v.ContentSize(), "will exceed limit", s.volumeSizeLimit)
|
||||||
if _, e := s.Join(); e != nil {
|
if _, _, e := s.Join(); e != nil {
|
||||||
glog.V(0).Infoln("error when reporting size:", e)
|
glog.V(0).Infoln("error when reporting size:", e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,11 +7,18 @@ import (
|
|||||||
|
|
||||||
"github.com/chrislusf/weed-fs/go/glog"
|
"github.com/chrislusf/weed-fs/go/glog"
|
||||||
"github.com/chrislusf/weed-fs/go/operation"
|
"github.com/chrislusf/weed-fs/go/operation"
|
||||||
|
"github.com/chrislusf/weed-fs/go/security"
|
||||||
"github.com/chrislusf/weed-fs/go/storage"
|
"github.com/chrislusf/weed-fs/go/storage"
|
||||||
"github.com/chrislusf/weed-fs/go/util"
|
"github.com/chrislusf/weed-fs/go/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
func ReplicatedWrite(masterNode string, s *storage.Store, volumeId storage.VolumeId, needle *storage.Needle, r *http.Request) (size uint32, errorStatus string) {
|
func ReplicatedWrite(masterNode string, s *storage.Store,
|
||||||
|
volumeId storage.VolumeId, needle *storage.Needle,
|
||||||
|
r *http.Request) (size uint32, errorStatus string) {
|
||||||
|
|
||||||
|
//check JWT
|
||||||
|
jwt := security.GetJwt(r)
|
||||||
|
|
||||||
ret, err := s.Write(volumeId, needle)
|
ret, err := s.Write(volumeId, needle)
|
||||||
needToReplicate := !s.HasVolume(volumeId)
|
needToReplicate := !s.HasVolume(volumeId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -27,7 +34,10 @@ func ReplicatedWrite(masterNode string, s *storage.Store, volumeId storage.Volum
|
|||||||
if needToReplicate { //send to other replica locations
|
if needToReplicate { //send to other replica locations
|
||||||
if r.FormValue("type") != "replicate" {
|
if r.FormValue("type") != "replicate" {
|
||||||
if !distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool {
|
if !distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool {
|
||||||
_, err := operation.Upload("http://"+location.Url+r.URL.Path+"?type=replicate&ts="+strconv.FormatUint(needle.LastModified, 10), string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime))
|
_, err := operation.Upload(
|
||||||
|
"http://"+location.Url+r.URL.Path+"?type=replicate&ts="+strconv.FormatUint(needle.LastModified, 10),
|
||||||
|
string(needle.Name), bytes.NewReader(needle.Data), needle.IsGzipped(), string(needle.Mime),
|
||||||
|
jwt)
|
||||||
return err == nil
|
return err == nil
|
||||||
}) {
|
}) {
|
||||||
ret = 0
|
ret = 0
|
||||||
@@ -41,7 +51,7 @@ func ReplicatedWrite(masterNode string, s *storage.Store, volumeId storage.Volum
|
|||||||
volumeId.String() + ": " + err.Error()
|
volumeId.String() + ": " + err.Error()
|
||||||
} else {
|
} else {
|
||||||
distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool {
|
distributedOperation(masterNode, s, volumeId, func(location operation.Location) bool {
|
||||||
return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate")
|
return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", jwt)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -49,7 +59,13 @@ func ReplicatedWrite(masterNode string, s *storage.Store, volumeId storage.Volum
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func ReplicatedDelete(masterNode string, store *storage.Store, volumeId storage.VolumeId, n *storage.Needle, r *http.Request) (ret uint32) {
|
func ReplicatedDelete(masterNode string, store *storage.Store,
|
||||||
|
volumeId storage.VolumeId, n *storage.Needle,
|
||||||
|
r *http.Request) (ret uint32) {
|
||||||
|
|
||||||
|
//check JWT
|
||||||
|
jwt := security.GetJwt(r)
|
||||||
|
|
||||||
ret, err := store.Delete(volumeId, n)
|
ret, err := store.Delete(volumeId, n)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(0).Infoln("delete error:", err)
|
glog.V(0).Infoln("delete error:", err)
|
||||||
@@ -63,7 +79,7 @@ func ReplicatedDelete(masterNode string, store *storage.Store, volumeId storage.
|
|||||||
if needToReplicate { //send to other replica locations
|
if needToReplicate { //send to other replica locations
|
||||||
if r.FormValue("type") != "replicate" {
|
if r.FormValue("type") != "replicate" {
|
||||||
if !distributedOperation(masterNode, store, volumeId, func(location operation.Location) bool {
|
if !distributedOperation(masterNode, store, volumeId, func(location operation.Location) bool {
|
||||||
return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate")
|
return nil == util.Delete("http://"+location.Url+r.URL.Path+"?type=replicate", jwt)
|
||||||
}) {
|
}) {
|
||||||
ret = 0
|
ret = 0
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,6 +7,8 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/chrislusf/weed-fs/go/security"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@@ -63,8 +65,11 @@ func Get(url string) ([]byte, error) {
|
|||||||
return b, nil
|
return b, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func Delete(url string) error {
|
func Delete(url string, jwt security.EncodedJwt) error {
|
||||||
req, err := http.NewRequest("DELETE", url, nil)
|
req, err := http.NewRequest("DELETE", url, nil)
|
||||||
|
if jwt != "" {
|
||||||
|
req.Header.Set("Authorization", "BEARER "+string(jwt))
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ import (
|
|||||||
|
|
||||||
"github.com/chrislusf/weed-fs/go/glog"
|
"github.com/chrislusf/weed-fs/go/glog"
|
||||||
"github.com/chrislusf/weed-fs/go/operation"
|
"github.com/chrislusf/weed-fs/go/operation"
|
||||||
|
"github.com/chrislusf/weed-fs/go/security"
|
||||||
"github.com/chrislusf/weed-fs/go/util"
|
"github.com/chrislusf/weed-fs/go/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -32,6 +33,7 @@ type BenchmarkOptions struct {
|
|||||||
collection *string
|
collection *string
|
||||||
cpuprofile *string
|
cpuprofile *string
|
||||||
maxCpu *int
|
maxCpu *int
|
||||||
|
secretKey *string
|
||||||
vid2server map[string]string //cache for vid locations
|
vid2server map[string]string //cache for vid locations
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -56,6 +58,7 @@ func init() {
|
|||||||
b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
|
b.collection = cmdBenchmark.Flag.String("collection", "benchmark", "write data to this collection")
|
||||||
b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
|
b.cpuprofile = cmdBenchmark.Flag.String("cpuprofile", "", "cpu profile output file")
|
||||||
b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
|
b.maxCpu = cmdBenchmark.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
|
||||||
|
b.secretKey = cmdBenchmark.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
|
||||||
b.vid2server = make(map[string]string)
|
b.vid2server = make(map[string]string)
|
||||||
sharedBytes = make([]byte, 1024)
|
sharedBytes = make([]byte, 1024)
|
||||||
}
|
}
|
||||||
@@ -181,6 +184,8 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
|
|||||||
defer wait.Done()
|
defer wait.Done()
|
||||||
delayedDeleteChan := make(chan *delayedFile, 100)
|
delayedDeleteChan := make(chan *delayedFile, 100)
|
||||||
var waitForDeletions sync.WaitGroup
|
var waitForDeletions sync.WaitGroup
|
||||||
|
secret := security.Secret(*b.secretKey)
|
||||||
|
|
||||||
for i := 0; i < 7; i++ {
|
for i := 0; i < 7; i++ {
|
||||||
waitForDeletions.Add(1)
|
waitForDeletions.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
@@ -189,7 +194,8 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
|
|||||||
if df.enterTime.After(time.Now()) {
|
if df.enterTime.After(time.Now()) {
|
||||||
time.Sleep(df.enterTime.Sub(time.Now()))
|
time.Sleep(df.enterTime.Sub(time.Now()))
|
||||||
}
|
}
|
||||||
if e := util.Delete("http://" + df.fp.Server + "/" + df.fp.Fid); e == nil {
|
if e := util.Delete("http://"+df.fp.Server+"/"+df.fp.Fid,
|
||||||
|
security.GenJwt(secret, df.fp.Fid)); e == nil {
|
||||||
s.completed++
|
s.completed++
|
||||||
} else {
|
} else {
|
||||||
s.failed++
|
s.failed++
|
||||||
@@ -204,7 +210,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
|
|||||||
fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize}
|
fp := &operation.FilePart{Reader: &FakeReader{id: uint64(id), size: fileSize}, FileSize: fileSize}
|
||||||
if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil {
|
if assignResult, err := operation.Assign(*b.server, 1, "", *b.collection, ""); err == nil {
|
||||||
fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection
|
fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection
|
||||||
if _, err := fp.Upload(0, *b.server); err == nil {
|
if _, err := fp.Upload(0, *b.server, secret); err == nil {
|
||||||
if rand.Intn(100) < *b.deletePercentage {
|
if rand.Intn(100) < *b.deletePercentage {
|
||||||
s.total++
|
s.total++
|
||||||
delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
|
delayedDeleteChan <- &delayedFile{time.Now().Add(time.Second), fp}
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ type FilerOptions struct {
|
|||||||
defaultReplicaPlacement *string
|
defaultReplicaPlacement *string
|
||||||
dir *string
|
dir *string
|
||||||
redirectOnRead *bool
|
redirectOnRead *bool
|
||||||
|
secretKey *string
|
||||||
cassandra_server *string
|
cassandra_server *string
|
||||||
cassandra_keyspace *string
|
cassandra_keyspace *string
|
||||||
redis_server *string
|
redis_server *string
|
||||||
@@ -40,6 +41,8 @@ func init() {
|
|||||||
f.cassandra_keyspace = cmdFiler.Flag.String("cassandra.keyspace", "seaweed", "keyspace of the cassandra server")
|
f.cassandra_keyspace = cmdFiler.Flag.String("cassandra.keyspace", "seaweed", "keyspace of the cassandra server")
|
||||||
f.redis_server = cmdFiler.Flag.String("redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379")
|
f.redis_server = cmdFiler.Flag.String("redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379")
|
||||||
f.redis_database = cmdFiler.Flag.Int("redis.database", 0, "the database on the redis server")
|
f.redis_database = cmdFiler.Flag.Int("redis.database", 0, "the database on the redis server")
|
||||||
|
f.secretKey = cmdFiler.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var cmdFiler = &Command{
|
var cmdFiler = &Command{
|
||||||
@@ -73,6 +76,7 @@ func runFiler(cmd *Command, args []string) bool {
|
|||||||
r := http.NewServeMux()
|
r := http.NewServeMux()
|
||||||
_, nfs_err := weed_server.NewFilerServer(r, *f.port, *f.master, *f.dir, *f.collection,
|
_, nfs_err := weed_server.NewFilerServer(r, *f.port, *f.master, *f.dir, *f.collection,
|
||||||
*f.defaultReplicaPlacement, *f.redirectOnRead,
|
*f.defaultReplicaPlacement, *f.redirectOnRead,
|
||||||
|
*f.secretKey,
|
||||||
*f.cassandra_server, *f.cassandra_keyspace,
|
*f.cassandra_server, *f.cassandra_keyspace,
|
||||||
*f.redis_server, *f.redis_database,
|
*f.redis_server, *f.redis_database,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -41,7 +41,7 @@ var (
|
|||||||
mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
|
mMaxCpu = cmdMaster.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs")
|
||||||
garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces")
|
garbageThreshold = cmdMaster.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces")
|
||||||
masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
|
masterWhiteListOption = cmdMaster.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
|
||||||
masterSecureKey = cmdMaster.Flag.String("secure.key", "", "secret key to check permission")
|
masterSecureKey = cmdMaster.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
|
||||||
|
|
||||||
masterWhiteList []string
|
masterWhiteList []string
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -56,7 +56,7 @@ var (
|
|||||||
serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name")
|
serverRack = cmdServer.Flag.String("rack", "", "current volume server's rack name")
|
||||||
serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
|
serverWhiteListOption = cmdServer.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.")
|
||||||
serverPeers = cmdServer.Flag.String("master.peers", "", "other master nodes in comma separated ip:masterPort list")
|
serverPeers = cmdServer.Flag.String("master.peers", "", "other master nodes in comma separated ip:masterPort list")
|
||||||
serverSecureKey = cmdServer.Flag.String("secure.key", "", "secret key to ensure authenticated access")
|
serverSecureKey = cmdServer.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
|
||||||
serverGarbageThreshold = cmdServer.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces")
|
serverGarbageThreshold = cmdServer.Flag.String("garbageThreshold", "0.3", "threshold to vacuum and reclaim spaces")
|
||||||
masterPort = cmdServer.Flag.Int("master.port", 9333, "master server http listen port")
|
masterPort = cmdServer.Flag.Int("master.port", 9333, "master server http listen port")
|
||||||
masterMetaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified")
|
masterMetaFolder = cmdServer.Flag.String("master.dir", "", "data directory to store meta data, default to same as -dir specified")
|
||||||
@@ -86,10 +86,10 @@ func init() {
|
|||||||
filerOptions.cassandra_keyspace = cmdServer.Flag.String("filer.cassandra.keyspace", "seaweed", "keyspace of the cassandra server")
|
filerOptions.cassandra_keyspace = cmdServer.Flag.String("filer.cassandra.keyspace", "seaweed", "keyspace of the cassandra server")
|
||||||
filerOptions.redis_server = cmdServer.Flag.String("filer.redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379")
|
filerOptions.redis_server = cmdServer.Flag.String("filer.redis.server", "", "host:port of the redis server, e.g., 127.0.0.1:6379")
|
||||||
filerOptions.redis_database = cmdServer.Flag.Int("filer.redis.database", 0, "the database on the redis server")
|
filerOptions.redis_database = cmdServer.Flag.Int("filer.redis.database", 0, "the database on the redis server")
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func runServer(cmd *Command, args []string) bool {
|
func runServer(cmd *Command, args []string) bool {
|
||||||
|
filerOptions.secretKey = serverSecureKey
|
||||||
if *serverOptions.cpuprofile != "" {
|
if *serverOptions.cpuprofile != "" {
|
||||||
f, err := os.Create(*serverOptions.cpuprofile)
|
f, err := os.Create(*serverOptions.cpuprofile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -162,6 +162,7 @@ func runServer(cmd *Command, args []string) bool {
|
|||||||
r := http.NewServeMux()
|
r := http.NewServeMux()
|
||||||
_, nfs_err := weed_server.NewFilerServer(r, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection,
|
_, nfs_err := weed_server.NewFilerServer(r, *filerOptions.port, *filerOptions.master, *filerOptions.dir, *filerOptions.collection,
|
||||||
*filerOptions.defaultReplicaPlacement, *filerOptions.redirectOnRead,
|
*filerOptions.defaultReplicaPlacement, *filerOptions.redirectOnRead,
|
||||||
|
*filerOptions.secretKey,
|
||||||
"", "",
|
"", "",
|
||||||
"", 0,
|
"", 0,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
"github.com/chrislusf/weed-fs/go/operation"
|
"github.com/chrislusf/weed-fs/go/operation"
|
||||||
|
"github.com/chrislusf/weed-fs/go/security"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@@ -15,6 +16,7 @@ var (
|
|||||||
uploadDir *string
|
uploadDir *string
|
||||||
uploadTtl *string
|
uploadTtl *string
|
||||||
include *string
|
include *string
|
||||||
|
uploadSecretKey *string
|
||||||
maxMB *int
|
maxMB *int
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -28,6 +30,7 @@ func init() {
|
|||||||
uploadCollection = cmdUpload.Flag.String("collection", "", "optional collection name")
|
uploadCollection = cmdUpload.Flag.String("collection", "", "optional collection name")
|
||||||
uploadTtl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
|
uploadTtl = cmdUpload.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y")
|
||||||
maxMB = cmdUpload.Flag.Int("maxMB", 0, "split files larger than the limit")
|
maxMB = cmdUpload.Flag.Int("maxMB", 0, "split files larger than the limit")
|
||||||
|
uploadSecretKey = cmdUpload.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)")
|
||||||
}
|
}
|
||||||
|
|
||||||
var cmdUpload = &Command{
|
var cmdUpload = &Command{
|
||||||
@@ -54,6 +57,7 @@ var cmdUpload = &Command{
|
|||||||
}
|
}
|
||||||
|
|
||||||
func runUpload(cmd *Command, args []string) bool {
|
func runUpload(cmd *Command, args []string) bool {
|
||||||
|
secret := security.Secret(*uploadSecretKey)
|
||||||
if len(cmdUpload.Flag.Args()) == 0 {
|
if len(cmdUpload.Flag.Args()) == 0 {
|
||||||
if *uploadDir == "" {
|
if *uploadDir == "" {
|
||||||
return false
|
return false
|
||||||
@@ -70,7 +74,9 @@ func runUpload(cmd *Command, args []string) bool {
|
|||||||
if e != nil {
|
if e != nil {
|
||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
results, e := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *uploadTtl, *maxMB)
|
results, e := operation.SubmitFiles(*server, parts,
|
||||||
|
*uploadReplication, *uploadCollection,
|
||||||
|
*uploadTtl, *maxMB, secret)
|
||||||
bytes, _ := json.Marshal(results)
|
bytes, _ := json.Marshal(results)
|
||||||
fmt.Println(string(bytes))
|
fmt.Println(string(bytes))
|
||||||
if e != nil {
|
if e != nil {
|
||||||
@@ -87,7 +93,9 @@ func runUpload(cmd *Command, args []string) bool {
|
|||||||
if e != nil {
|
if e != nil {
|
||||||
fmt.Println(e.Error())
|
fmt.Println(e.Error())
|
||||||
}
|
}
|
||||||
results, _ := operation.SubmitFiles(*server, parts, *uploadReplication, *uploadCollection, *uploadTtl, *maxMB)
|
results, _ := operation.SubmitFiles(*server, parts,
|
||||||
|
*uploadReplication, *uploadCollection,
|
||||||
|
*uploadTtl, *maxMB, secret)
|
||||||
bytes, _ := json.Marshal(results)
|
bytes, _ := json.Marshal(results)
|
||||||
fmt.Println(string(bytes))
|
fmt.Println(string(bytes))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ import (
|
|||||||
|
|
||||||
"github.com/chrislusf/weed-fs/go/glog"
|
"github.com/chrislusf/weed-fs/go/glog"
|
||||||
"github.com/chrislusf/weed-fs/go/operation"
|
"github.com/chrislusf/weed-fs/go/operation"
|
||||||
|
"github.com/chrislusf/weed-fs/go/security"
|
||||||
"github.com/chrislusf/weed-fs/go/stats"
|
"github.com/chrislusf/weed-fs/go/stats"
|
||||||
"github.com/chrislusf/weed-fs/go/storage"
|
"github.com/chrislusf/weed-fs/go/storage"
|
||||||
"github.com/chrislusf/weed-fs/go/util"
|
"github.com/chrislusf/weed-fs/go/util"
|
||||||
@@ -75,6 +76,7 @@ func debug(params ...interface{}) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string) {
|
func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string) {
|
||||||
|
jwt := security.GetJwt(r)
|
||||||
m := make(map[string]interface{})
|
m := make(map[string]interface{})
|
||||||
if r.Method != "POST" {
|
if r.Method != "POST" {
|
||||||
writeJsonError(w, r, http.StatusMethodNotAllowed, errors.New("Only submit via POST!"))
|
writeJsonError(w, r, http.StatusMethodNotAllowed, errors.New("Only submit via POST!"))
|
||||||
@@ -102,7 +104,7 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st
|
|||||||
}
|
}
|
||||||
|
|
||||||
debug("upload file to store", url)
|
debug("upload file to store", url)
|
||||||
uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType)
|
uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType, jwt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeJsonError(w, r, http.StatusInternalServerError, err)
|
writeJsonError(w, r, http.StatusInternalServerError, err)
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import (
|
|||||||
"github.com/chrislusf/weed-fs/go/filer/flat_namespace"
|
"github.com/chrislusf/weed-fs/go/filer/flat_namespace"
|
||||||
"github.com/chrislusf/weed-fs/go/filer/redis_store"
|
"github.com/chrislusf/weed-fs/go/filer/redis_store"
|
||||||
"github.com/chrislusf/weed-fs/go/glog"
|
"github.com/chrislusf/weed-fs/go/glog"
|
||||||
|
"github.com/chrislusf/weed-fs/go/security"
|
||||||
)
|
)
|
||||||
|
|
||||||
type FilerServer struct {
|
type FilerServer struct {
|
||||||
@@ -18,11 +19,13 @@ type FilerServer struct {
|
|||||||
collection string
|
collection string
|
||||||
defaultReplication string
|
defaultReplication string
|
||||||
redirectOnRead bool
|
redirectOnRead bool
|
||||||
|
secret security.Secret
|
||||||
filer filer.Filer
|
filer filer.Filer
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewFilerServer(r *http.ServeMux, port int, master string, dir string, collection string,
|
func NewFilerServer(r *http.ServeMux, port int, master string, dir string, collection string,
|
||||||
replication string, redirectOnRead bool,
|
replication string, redirectOnRead bool,
|
||||||
|
secret string,
|
||||||
cassandra_server string, cassandra_keyspace string,
|
cassandra_server string, cassandra_keyspace string,
|
||||||
redis_server string, redis_database int,
|
redis_server string, redis_database int,
|
||||||
) (fs *FilerServer, err error) {
|
) (fs *FilerServer, err error) {
|
||||||
@@ -56,3 +59,7 @@ func NewFilerServer(r *http.ServeMux, port int, master string, dir string, colle
|
|||||||
|
|
||||||
return fs, nil
|
return fs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (fs *FilerServer) jwt(fileId string) security.EncodedJwt {
|
||||||
|
return security.GenJwt(fs.secret, fileId)
|
||||||
|
}
|
||||||
|
|||||||
@@ -170,7 +170,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
if ret.Name != "" {
|
if ret.Name != "" {
|
||||||
path += ret.Name
|
path += ret.Name
|
||||||
} else {
|
} else {
|
||||||
operation.DeleteFile(fs.master, assignResult.Fid) //clean up
|
operation.DeleteFile(fs.master, assignResult.Fid, fs.jwt(assignResult.Fid)) //clean up
|
||||||
glog.V(0).Infoln("Can not to write to folder", path, "without a file name!")
|
glog.V(0).Infoln("Can not to write to folder", path, "without a file name!")
|
||||||
writeJsonError(w, r, http.StatusInternalServerError,
|
writeJsonError(w, r, http.StatusInternalServerError,
|
||||||
errors.New("Can not to write to folder "+path+" without a file name"))
|
errors.New("Can not to write to folder "+path+" without a file name"))
|
||||||
@@ -179,7 +179,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
glog.V(4).Infoln("saving", path, "=>", assignResult.Fid)
|
glog.V(4).Infoln("saving", path, "=>", assignResult.Fid)
|
||||||
if db_err := fs.filer.CreateFile(path, assignResult.Fid); db_err != nil {
|
if db_err := fs.filer.CreateFile(path, assignResult.Fid); db_err != nil {
|
||||||
operation.DeleteFile(fs.master, assignResult.Fid) //clean up
|
operation.DeleteFile(fs.master, assignResult.Fid, fs.jwt(assignResult.Fid)) //clean up
|
||||||
glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
|
glog.V(0).Infof("failing to write %s to filer server : %v", path, db_err)
|
||||||
writeJsonError(w, r, http.StatusInternalServerError, db_err)
|
writeJsonError(w, r, http.StatusInternalServerError, db_err)
|
||||||
return
|
return
|
||||||
@@ -199,7 +199,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
} else {
|
} else {
|
||||||
fid, err = fs.filer.DeleteFile(r.URL.Path)
|
fid, err = fs.filer.DeleteFile(r.URL.Path)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
err = operation.DeleteFile(fs.master, fid)
|
err = operation.DeleteFile(fs.master, fid, fs.jwt(fid))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ type MasterServer struct {
|
|||||||
pulseSeconds int
|
pulseSeconds int
|
||||||
defaultReplicaPlacement string
|
defaultReplicaPlacement string
|
||||||
garbageThreshold string
|
garbageThreshold string
|
||||||
|
guard *security.Guard
|
||||||
|
|
||||||
Topo *topology.Topology
|
Topo *topology.Topology
|
||||||
vg *topology.VolumeGrowth
|
vg *topology.VolumeGrowth
|
||||||
@@ -57,22 +58,22 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string,
|
|||||||
ms.vg = topology.NewDefaultVolumeGrowth()
|
ms.vg = topology.NewDefaultVolumeGrowth()
|
||||||
glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB")
|
glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB")
|
||||||
|
|
||||||
guard := security.NewGuard(whiteList, secureKey)
|
ms.guard = security.NewGuard(whiteList, secureKey)
|
||||||
|
|
||||||
r.HandleFunc("/dir/assign", ms.proxyToLeader(guard.Secure(ms.dirAssignHandler)))
|
r.HandleFunc("/dir/assign", ms.proxyToLeader(ms.guard.WhiteList(ms.dirAssignHandler)))
|
||||||
r.HandleFunc("/dir/lookup", ms.proxyToLeader(guard.Secure(ms.dirLookupHandler)))
|
r.HandleFunc("/dir/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.dirLookupHandler)))
|
||||||
r.HandleFunc("/dir/join", ms.proxyToLeader(guard.Secure(ms.dirJoinHandler)))
|
r.HandleFunc("/dir/join", ms.proxyToLeader(ms.guard.WhiteList(ms.dirJoinHandler)))
|
||||||
r.HandleFunc("/dir/status", ms.proxyToLeader(guard.Secure(ms.dirStatusHandler)))
|
r.HandleFunc("/dir/status", ms.proxyToLeader(ms.guard.WhiteList(ms.dirStatusHandler)))
|
||||||
r.HandleFunc("/col/delete", ms.proxyToLeader(guard.Secure(ms.collectionDeleteHandler)))
|
r.HandleFunc("/col/delete", ms.proxyToLeader(ms.guard.WhiteList(ms.collectionDeleteHandler)))
|
||||||
r.HandleFunc("/vol/lookup", ms.proxyToLeader(guard.Secure(ms.volumeLookupHandler)))
|
r.HandleFunc("/vol/lookup", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeLookupHandler)))
|
||||||
r.HandleFunc("/vol/grow", ms.proxyToLeader(guard.Secure(ms.volumeGrowHandler)))
|
r.HandleFunc("/vol/grow", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeGrowHandler)))
|
||||||
r.HandleFunc("/vol/status", ms.proxyToLeader(guard.Secure(ms.volumeStatusHandler)))
|
r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler)))
|
||||||
r.HandleFunc("/vol/vacuum", ms.proxyToLeader(guard.Secure(ms.volumeVacuumHandler)))
|
r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler)))
|
||||||
r.HandleFunc("/submit", guard.Secure(ms.submitFromMasterServerHandler))
|
r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler))
|
||||||
r.HandleFunc("/delete", guard.Secure(ms.deleteFromMasterServerHandler))
|
r.HandleFunc("/delete", ms.guard.WhiteList(ms.deleteFromMasterServerHandler))
|
||||||
r.HandleFunc("/{fileId}", ms.redirectHandler)
|
r.HandleFunc("/{fileId}", ms.redirectHandler)
|
||||||
r.HandleFunc("/stats/counter", guard.Secure(statsCounterHandler))
|
r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler))
|
||||||
r.HandleFunc("/stats/memory", guard.Secure(statsMemoryHandler))
|
r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler))
|
||||||
|
|
||||||
ms.Topo.StartRefreshWritableVolumes(garbageThreshold)
|
ms.Topo.StartRefreshWritableVolumes(garbageThreshold)
|
||||||
|
|
||||||
|
|||||||
@@ -58,7 +58,10 @@ func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ms.Topo.ProcessJoinMessage(joinMessage)
|
ms.Topo.ProcessJoinMessage(joinMessage)
|
||||||
writeJsonQuiet(w, r, http.StatusOK, operation.JoinResult{VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024})
|
writeJsonQuiet(w, r, http.StatusOK, operation.JoinResult{
|
||||||
|
VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024,
|
||||||
|
SecretKey: string(ms.guard.SecretKey),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) {
|
func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|||||||
@@ -41,16 +41,16 @@ func NewVolumeServer(publicMux, adminMux *http.ServeMux, ip string,
|
|||||||
|
|
||||||
vs.guard = security.NewGuard(whiteList, "")
|
vs.guard = security.NewGuard(whiteList, "")
|
||||||
|
|
||||||
adminMux.HandleFunc("/status", vs.guard.Secure(vs.statusHandler))
|
adminMux.HandleFunc("/status", vs.guard.WhiteList(vs.statusHandler))
|
||||||
adminMux.HandleFunc("/admin/assign_volume", vs.guard.Secure(vs.assignVolumeHandler))
|
adminMux.HandleFunc("/admin/assign_volume", vs.guard.WhiteList(vs.assignVolumeHandler))
|
||||||
adminMux.HandleFunc("/admin/vacuum_volume_check", vs.guard.Secure(vs.vacuumVolumeCheckHandler))
|
adminMux.HandleFunc("/admin/vacuum_volume_check", vs.guard.WhiteList(vs.vacuumVolumeCheckHandler))
|
||||||
adminMux.HandleFunc("/admin/vacuum_volume_compact", vs.guard.Secure(vs.vacuumVolumeCompactHandler))
|
adminMux.HandleFunc("/admin/vacuum_volume_compact", vs.guard.WhiteList(vs.vacuumVolumeCompactHandler))
|
||||||
adminMux.HandleFunc("/admin/vacuum_volume_commit", vs.guard.Secure(vs.vacuumVolumeCommitHandler))
|
adminMux.HandleFunc("/admin/vacuum_volume_commit", vs.guard.WhiteList(vs.vacuumVolumeCommitHandler))
|
||||||
adminMux.HandleFunc("/admin/freeze_volume", vs.guard.Secure(vs.freezeVolumeHandler))
|
adminMux.HandleFunc("/admin/freeze_volume", vs.guard.WhiteList(vs.freezeVolumeHandler))
|
||||||
adminMux.HandleFunc("/admin/delete_collection", vs.guard.Secure(vs.deleteCollectionHandler))
|
adminMux.HandleFunc("/admin/delete_collection", vs.guard.WhiteList(vs.deleteCollectionHandler))
|
||||||
adminMux.HandleFunc("/stats/counter", vs.guard.Secure(statsCounterHandler))
|
adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler))
|
||||||
adminMux.HandleFunc("/stats/memory", vs.guard.Secure(statsMemoryHandler))
|
adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler))
|
||||||
adminMux.HandleFunc("/stats/disk", vs.guard.Secure(vs.statsDiskHandler))
|
adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler))
|
||||||
publicMux.HandleFunc("/delete", vs.guard.Secure(vs.batchDeleteHandler))
|
publicMux.HandleFunc("/delete", vs.guard.Secure(vs.batchDeleteHandler))
|
||||||
publicMux.HandleFunc("/", vs.storeHandler)
|
publicMux.HandleFunc("/", vs.storeHandler)
|
||||||
|
|
||||||
@@ -61,12 +61,13 @@ func NewVolumeServer(publicMux, adminMux *http.ServeMux, ip string,
|
|||||||
vs.store.SetDataCenter(vs.dataCenter)
|
vs.store.SetDataCenter(vs.dataCenter)
|
||||||
vs.store.SetRack(vs.rack)
|
vs.store.SetRack(vs.rack)
|
||||||
for {
|
for {
|
||||||
master, err := vs.store.Join()
|
master, secretKey, err := vs.store.Join()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
if !connected {
|
if !connected {
|
||||||
connected = true
|
connected = true
|
||||||
vs.SetMasterNode(master)
|
vs.SetMasterNode(master)
|
||||||
glog.V(0).Infoln("Volume Server Connected with master at", master, "and set it as masterNode")
|
vs.guard.SecretKey = secretKey
|
||||||
|
glog.V(0).Infoln("Volume Server Connected with master at", master)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
glog.V(4).Infoln("Volume Server Failed to talk with master:", err.Error())
|
glog.V(4).Infoln("Volume Server Failed to talk with master:", err.Error())
|
||||||
@@ -102,3 +103,7 @@ func (vs *VolumeServer) Shutdown() {
|
|||||||
vs.store.Close()
|
vs.store.Close()
|
||||||
glog.V(0).Infoln("Shut down successfully!")
|
glog.V(0).Infoln("Shut down successfully!")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (vs *VolumeServer) jwt(fileId string) security.EncodedJwt {
|
||||||
|
return security.GenJwt(vs.guard.SecretKey, fileId)
|
||||||
|
}
|
||||||
|
|||||||
@@ -253,7 +253,8 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ret := operation.UploadResult{}
|
ret := operation.UploadResult{}
|
||||||
size, errorStatus := topology.ReplicatedWrite(vs.GetMasterNode(), vs.store, volumeId, needle, r)
|
size, errorStatus := topology.ReplicatedWrite(vs.GetMasterNode(),
|
||||||
|
vs.store, volumeId, needle, r)
|
||||||
httpStatus := http.StatusCreated
|
httpStatus := http.StatusCreated
|
||||||
if errorStatus != "" {
|
if errorStatus != "" {
|
||||||
httpStatus = http.StatusInternalServerError
|
httpStatus = http.StatusInternalServerError
|
||||||
|
|||||||
Reference in New Issue
Block a user