Merge branch 'upstream_master' into store_s3cred
# Conflicts: # weed/s3api/filer_util.go
This commit is contained in:
@@ -3,10 +3,11 @@ package s3api
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"github.com/chrislusf/seaweedfs/weed/s3api/s3err"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
|
||||
xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http"
|
||||
"github.com/chrislusf/seaweedfs/weed/s3api/s3err"
|
||||
"github.com/golang/protobuf/jsonpb"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
@@ -128,8 +129,14 @@ func (iam *IdentityAccessManagement) Auth(f http.HandlerFunc, action Action) htt
|
||||
}
|
||||
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
errCode := iam.authRequest(r, action)
|
||||
identity, errCode := iam.authRequest(r, action)
|
||||
if errCode == s3err.ErrNone {
|
||||
if identity != nil && identity.Name != "" {
|
||||
r.Header.Set(xhttp.AmzIdentityId, identity.Name)
|
||||
if identity.isAdmin() {
|
||||
r.Header.Set(xhttp.AmzIsAdmin, "true")
|
||||
}
|
||||
}
|
||||
f(w, r)
|
||||
return
|
||||
}
|
||||
@@ -138,16 +145,16 @@ func (iam *IdentityAccessManagement) Auth(f http.HandlerFunc, action Action) htt
|
||||
}
|
||||
|
||||
// check whether the request has valid access keys
|
||||
func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action) s3err.ErrorCode {
|
||||
func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action) (*Identity, s3err.ErrorCode) {
|
||||
var identity *Identity
|
||||
var s3Err s3err.ErrorCode
|
||||
var found bool
|
||||
switch getRequestAuthType(r) {
|
||||
case authTypeStreamingSigned:
|
||||
return s3err.ErrNone
|
||||
return identity, s3err.ErrNone
|
||||
case authTypeUnknown:
|
||||
glog.V(3).Infof("unknown auth type")
|
||||
return s3err.ErrAccessDenied
|
||||
return identity, s3err.ErrAccessDenied
|
||||
case authTypePresignedV2, authTypeSignedV2:
|
||||
glog.V(3).Infof("v2 auth type")
|
||||
identity, s3Err = iam.isReqAuthenticatedV2(r)
|
||||
@@ -156,22 +163,22 @@ func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action)
|
||||
identity, s3Err = iam.reqSignatureV4Verify(r)
|
||||
case authTypePostPolicy:
|
||||
glog.V(3).Infof("post policy auth type")
|
||||
return s3err.ErrNone
|
||||
return identity, s3err.ErrNone
|
||||
case authTypeJWT:
|
||||
glog.V(3).Infof("jwt auth type")
|
||||
return s3err.ErrNotImplemented
|
||||
return identity, s3err.ErrNotImplemented
|
||||
case authTypeAnonymous:
|
||||
identity, found = iam.lookupAnonymous()
|
||||
if !found {
|
||||
return s3err.ErrAccessDenied
|
||||
return identity, s3err.ErrAccessDenied
|
||||
}
|
||||
default:
|
||||
return s3err.ErrNotImplemented
|
||||
return identity, s3err.ErrNotImplemented
|
||||
}
|
||||
|
||||
glog.V(3).Infof("auth error: %v", s3Err)
|
||||
if s3Err != s3err.ErrNone {
|
||||
return s3Err
|
||||
return identity, s3Err
|
||||
}
|
||||
|
||||
glog.V(3).Infof("user name: %v actions: %v", identity.Name, identity.Actions)
|
||||
@@ -179,18 +186,16 @@ func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action)
|
||||
bucket, _ := getBucketAndObject(r)
|
||||
|
||||
if !identity.canDo(action, bucket) {
|
||||
return s3err.ErrAccessDenied
|
||||
return identity, s3err.ErrAccessDenied
|
||||
}
|
||||
|
||||
return s3err.ErrNone
|
||||
return identity, s3err.ErrNone
|
||||
|
||||
}
|
||||
|
||||
func (identity *Identity) canDo(action Action, bucket string) bool {
|
||||
for _, a := range identity.Actions {
|
||||
if a == "Admin" {
|
||||
return true
|
||||
}
|
||||
if identity.isAdmin() {
|
||||
return true
|
||||
}
|
||||
for _, a := range identity.Actions {
|
||||
if a == action {
|
||||
@@ -208,3 +213,12 @@ func (identity *Identity) canDo(action Action, bucket string) bool {
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (identity *Identity) isAdmin() bool {
|
||||
for _, a := range identity.Actions {
|
||||
if a == "Admin" {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -3,13 +3,14 @@ package s3api
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
|
||||
"github.com/chrislusf/seaweedfs/weed/pb/iam_pb"
|
||||
proto "github.com/golang/protobuf/proto"
|
||||
"google.golang.org/grpc"
|
||||
"strings"
|
||||
"github.com/chrislusf/seaweedfs/weed/util"
|
||||
)
|
||||
|
||||
func (s3a *S3ApiServer) mkdir(parentDirectoryPath string, dirName string, fn func(entry *filer_pb.Entry)) error {
|
||||
@@ -78,6 +79,11 @@ func (s3a *S3ApiServer) exists(parentDirectoryPath string, entryName string, isD
|
||||
|
||||
}
|
||||
|
||||
func (s3a *S3ApiServer) getEntry(parentDirectoryPath, entryName string) (entry *filer_pb.Entry, err error) {
|
||||
fullPath := util.NewFullPath(parentDirectoryPath, entryName)
|
||||
return filer_pb.GetEntry(s3a, fullPath)
|
||||
}
|
||||
|
||||
func loadS3config(iam *IdentityAccessManagement, option *S3ApiServerOption) error {
|
||||
return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
|
||||
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
|
||||
|
||||
@@ -28,3 +28,9 @@ const (
|
||||
AmzObjectTagging = "X-Amz-Tagging"
|
||||
AmzTagCount = "x-amz-tagging-count"
|
||||
)
|
||||
|
||||
// Non-Standard S3 HTTP request constants
|
||||
const (
|
||||
AmzIdentityId = "x-amz-identity-id"
|
||||
AmzIsAdmin = "x-amz-is-admin" // only set to http request header as a context
|
||||
)
|
||||
|
||||
@@ -4,11 +4,13 @@ import (
|
||||
"context"
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
"github.com/chrislusf/seaweedfs/weed/s3api/s3err"
|
||||
"math"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http"
|
||||
"github.com/chrislusf/seaweedfs/weed/s3api/s3err"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
|
||||
@@ -33,9 +35,14 @@ func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Reques
|
||||
return
|
||||
}
|
||||
|
||||
identityId := r.Header.Get(xhttp.AmzIdentityId)
|
||||
|
||||
var buckets []*s3.Bucket
|
||||
for _, entry := range entries {
|
||||
if entry.IsDirectory {
|
||||
if !s3a.hasAccess(r, entry) {
|
||||
continue
|
||||
}
|
||||
buckets = append(buckets, &s3.Bucket{
|
||||
Name: aws.String(entry.Name),
|
||||
CreationDate: aws.Time(time.Unix(entry.Attributes.Crtime, 0).UTC()),
|
||||
@@ -45,8 +52,8 @@ func (s3a *S3ApiServer) ListBucketsHandler(w http.ResponseWriter, r *http.Reques
|
||||
|
||||
response = ListAllMyBucketsResult{
|
||||
Owner: &s3.Owner{
|
||||
ID: aws.String(""),
|
||||
DisplayName: aws.String(""),
|
||||
ID: aws.String(identityId),
|
||||
DisplayName: aws.String(identityId),
|
||||
},
|
||||
Buckets: buckets,
|
||||
}
|
||||
@@ -80,13 +87,25 @@ func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request)
|
||||
writeErrorResponse(w, s3err.ErrInternalError, r.URL)
|
||||
return
|
||||
}
|
||||
if exist, err := s3a.exists(s3a.option.BucketsPath, bucket, true); err == nil && exist {
|
||||
errCode = s3err.ErrBucketAlreadyExists
|
||||
}
|
||||
if errCode != s3err.ErrNone {
|
||||
writeErrorResponse(w, errCode, r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
fn := func(entry *filer_pb.Entry) {
|
||||
if identityId := r.Header.Get(xhttp.AmzIdentityId); identityId != "" {
|
||||
if entry.Extended == nil {
|
||||
entry.Extended = make(map[string][]byte)
|
||||
}
|
||||
entry.Extended[xhttp.AmzIdentityId] = []byte(identityId)
|
||||
}
|
||||
}
|
||||
|
||||
// create the folder for bucket, but lazily create actual collection
|
||||
if err := s3a.mkdir(s3a.option.BucketsPath, bucket, nil); err != nil {
|
||||
if err := s3a.mkdir(s3a.option.BucketsPath, bucket, fn); err != nil {
|
||||
glog.Errorf("PutBucketHandler mkdir: %v", err)
|
||||
writeErrorResponse(w, s3err.ErrInternalError, r.URL)
|
||||
return
|
||||
@@ -99,6 +118,11 @@ func (s3a *S3ApiServer) DeleteBucketHandler(w http.ResponseWriter, r *http.Reque
|
||||
|
||||
bucket, _ := getBucketAndObject(r)
|
||||
|
||||
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
|
||||
writeErrorResponse(w, err, r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
err := s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||
|
||||
// delete collection
|
||||
@@ -128,28 +152,40 @@ func (s3a *S3ApiServer) HeadBucketHandler(w http.ResponseWriter, r *http.Request
|
||||
|
||||
bucket, _ := getBucketAndObject(r)
|
||||
|
||||
err := s3a.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||
|
||||
request := &filer_pb.LookupDirectoryEntryRequest{
|
||||
Directory: s3a.option.BucketsPath,
|
||||
Name: bucket,
|
||||
}
|
||||
|
||||
glog.V(1).Infof("lookup bucket: %v", request)
|
||||
if _, err := filer_pb.LookupEntry(client, request); err != nil {
|
||||
if err == filer_pb.ErrNotFound {
|
||||
return filer_pb.ErrNotFound
|
||||
}
|
||||
return fmt.Errorf("lookup bucket %s/%s: %v", s3a.option.BucketsPath, bucket, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
writeErrorResponse(w, s3err.ErrNoSuchBucket, r.URL)
|
||||
if err := s3a.checkBucket(r, bucket); err != s3err.ErrNone {
|
||||
writeErrorResponse(w, err, r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
writeSuccessResponseEmpty(w)
|
||||
}
|
||||
|
||||
func (s3a *S3ApiServer) checkBucket(r *http.Request, bucket string) s3err.ErrorCode {
|
||||
entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
|
||||
if entry == nil || err == filer_pb.ErrNotFound {
|
||||
return s3err.ErrNoSuchBucket
|
||||
}
|
||||
|
||||
if !s3a.hasAccess(r, entry) {
|
||||
return s3err.ErrAccessDenied
|
||||
}
|
||||
return s3err.ErrNone
|
||||
}
|
||||
|
||||
func (s3a *S3ApiServer) hasAccess(r *http.Request, entry *filer_pb.Entry) bool {
|
||||
isAdmin := r.Header.Get(xhttp.AmzIsAdmin) != ""
|
||||
if isAdmin {
|
||||
return true
|
||||
}
|
||||
if entry.Extended == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
identityId := r.Header.Get(xhttp.AmzIdentityId)
|
||||
if id, ok := entry.Extended[xhttp.AmzIdentityId]; ok {
|
||||
if identityId != string(id) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package s3api
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/s3api/s3err"
|
||||
"net/http"
|
||||
"net/url"
|
||||
@@ -47,6 +48,7 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
|
||||
}
|
||||
defer util.CloseResponse(resp)
|
||||
|
||||
glog.V(2).Infof("copy from %s to %s", srcUrl, dstUrl)
|
||||
etag, errCode := s3a.putToFiler(r, dstUrl, resp.Body)
|
||||
|
||||
if errCode != s3err.ErrNone {
|
||||
@@ -127,6 +129,7 @@ func (s3a *S3ApiServer) CopyObjectPartHandler(w http.ResponseWriter, r *http.Req
|
||||
}
|
||||
defer dataReader.Close()
|
||||
|
||||
glog.V(2).Infof("copy from %s to %s", srcUrl, dstUrl)
|
||||
etag, errCode := s3a.putToFiler(r, dstUrl, dataReader)
|
||||
|
||||
if errCode != s3err.ErrNone {
|
||||
|
||||
@@ -113,12 +113,6 @@ func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Reque
|
||||
|
||||
bucket, object := getBucketAndObject(r)
|
||||
|
||||
response, _ := s3a.listFilerEntries(bucket, object, 1, "", "/")
|
||||
if len(response.Contents) != 0 && strings.HasSuffix(object, "/") {
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return
|
||||
}
|
||||
|
||||
destUrl := fmt.Sprintf("http://%s%s/%s%s?recursive=true",
|
||||
s3a.option.Filer, s3a.option.BucketsPath, bucket, object)
|
||||
|
||||
@@ -266,11 +260,6 @@ func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, des
|
||||
|
||||
resp, postErr := client.Do(proxyReq)
|
||||
|
||||
if resp.ContentLength == -1 && !strings.HasSuffix(destUrl, "/") {
|
||||
writeErrorResponse(w, s3err.ErrNoSuchKey, r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
if postErr != nil {
|
||||
glog.Errorf("post to filer: %v", postErr)
|
||||
writeErrorResponse(w, s3err.ErrInternalError, r.URL)
|
||||
@@ -278,6 +267,11 @@ func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, des
|
||||
}
|
||||
defer util.CloseResponse(resp)
|
||||
|
||||
if (resp.ContentLength == -1 || resp.StatusCode == 404) && !strings.HasSuffix(destUrl, "/") {
|
||||
writeErrorResponse(w, s3err.ErrNoSuchKey, r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
responseFn(resp, w)
|
||||
|
||||
}
|
||||
@@ -323,7 +317,7 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
|
||||
|
||||
resp_body, ra_err := ioutil.ReadAll(resp.Body)
|
||||
if ra_err != nil {
|
||||
glog.Errorf("upload to filer response read: %v", ra_err)
|
||||
glog.Errorf("upload to filer response read %d: %v", resp.StatusCode, ra_err)
|
||||
return etag, s3err.ErrInternalError
|
||||
}
|
||||
var ret weed_server.FilerPostResult
|
||||
|
||||
@@ -2,6 +2,7 @@ package s3api
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||
"github.com/chrislusf/seaweedfs/weed/s3api/s3err"
|
||||
"net/http"
|
||||
"net/url"
|
||||
@@ -28,13 +29,13 @@ func (s3a *S3ApiServer) NewMultipartUploadHandler(w http.ResponseWriter, r *http
|
||||
Key: objectKey(aws.String(object)),
|
||||
})
|
||||
|
||||
glog.V(2).Info("NewMultipartUploadHandler", string(encodeResponse(response)), errCode)
|
||||
|
||||
if errCode != s3err.ErrNone {
|
||||
writeErrorResponse(w, errCode, r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
// println("NewMultipartUploadHandler", string(encodeResponse(response)))
|
||||
|
||||
writeSuccessResponseXML(w, encodeResponse(response))
|
||||
|
||||
}
|
||||
@@ -52,7 +53,7 @@ func (s3a *S3ApiServer) CompleteMultipartUploadHandler(w http.ResponseWriter, r
|
||||
UploadId: aws.String(uploadID),
|
||||
})
|
||||
|
||||
// println("CompleteMultipartUploadHandler", string(encodeResponse(response)), errCode)
|
||||
glog.V(2).Info("CompleteMultipartUploadHandler", string(encodeResponse(response)), errCode)
|
||||
|
||||
if errCode != s3err.ErrNone {
|
||||
writeErrorResponse(w, errCode, r.URL)
|
||||
@@ -81,7 +82,7 @@ func (s3a *S3ApiServer) AbortMultipartUploadHandler(w http.ResponseWriter, r *ht
|
||||
return
|
||||
}
|
||||
|
||||
// println("AbortMultipartUploadHandler", string(encodeResponse(response)))
|
||||
glog.V(2).Info("AbortMultipartUploadHandler", string(encodeResponse(response)))
|
||||
|
||||
writeSuccessResponseXML(w, encodeResponse(response))
|
||||
|
||||
@@ -114,13 +115,14 @@ func (s3a *S3ApiServer) ListMultipartUploadsHandler(w http.ResponseWriter, r *ht
|
||||
UploadIdMarker: aws.String(uploadIDMarker),
|
||||
})
|
||||
|
||||
glog.V(2).Info("ListMultipartUploadsHandler", string(encodeResponse(response)), errCode)
|
||||
|
||||
if errCode != s3err.ErrNone {
|
||||
writeErrorResponse(w, errCode, r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
// TODO handle encodingType
|
||||
// println("ListMultipartUploadsHandler", string(encodeResponse(response)))
|
||||
|
||||
writeSuccessResponseXML(w, encodeResponse(response))
|
||||
}
|
||||
@@ -147,13 +149,13 @@ func (s3a *S3ApiServer) ListObjectPartsHandler(w http.ResponseWriter, r *http.Re
|
||||
UploadId: aws.String(uploadID),
|
||||
})
|
||||
|
||||
glog.V(2).Info("ListObjectPartsHandler", string(encodeResponse(response)), errCode)
|
||||
|
||||
if errCode != s3err.ErrNone {
|
||||
writeErrorResponse(w, errCode, r.URL)
|
||||
return
|
||||
}
|
||||
|
||||
// println("ListObjectPartsHandler", string(encodeResponse(response)))
|
||||
|
||||
writeSuccessResponseXML(w, encodeResponse(response))
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user