working S3 multipart uploads
This commit is contained in:
@@ -0,0 +1,63 @@
|
|||||||
|
package com.seaweedfs.s3;
|
||||||
|
|
||||||
|
import com.amazonaws.AmazonServiceException;
|
||||||
|
import com.amazonaws.ClientConfiguration;
|
||||||
|
import com.amazonaws.SdkClientException;
|
||||||
|
import com.amazonaws.auth.AWSCredentials;
|
||||||
|
import com.amazonaws.auth.AWSStaticCredentialsProvider;
|
||||||
|
import com.amazonaws.auth.BasicAWSCredentials;
|
||||||
|
import com.amazonaws.client.builder.AwsClientBuilder;
|
||||||
|
import com.amazonaws.regions.Regions;
|
||||||
|
import com.amazonaws.services.s3.AmazonS3;
|
||||||
|
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
|
||||||
|
import com.amazonaws.services.s3.transfer.TransferManager;
|
||||||
|
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
|
||||||
|
import com.amazonaws.services.s3.transfer.Upload;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
|
||||||
|
public class HighLevelMultipartUpload {
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
String bucketName = "javabucket";
|
||||||
|
String filePath = args[0];
|
||||||
|
File file = new File(filePath);
|
||||||
|
String keyName = "path/to/" + file.getName();
|
||||||
|
|
||||||
|
try {
|
||||||
|
AWSCredentials credentials = new BasicAWSCredentials("ANY-ACCESSKEYID", "ANY-SECRETACCESSKEY");
|
||||||
|
ClientConfiguration clientConfiguration = new ClientConfiguration();
|
||||||
|
clientConfiguration.setSignerOverride("AWSS3V4SignerType");
|
||||||
|
|
||||||
|
AmazonS3 s3Client = AmazonS3ClientBuilder
|
||||||
|
.standard()
|
||||||
|
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
|
||||||
|
"http://localhost:8333", Regions.US_WEST_1.name()))
|
||||||
|
.withPathStyleAccessEnabled(true)
|
||||||
|
.withClientConfiguration(clientConfiguration)
|
||||||
|
.withCredentials(new AWSStaticCredentialsProvider(credentials))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
TransferManager tm = TransferManagerBuilder.standard()
|
||||||
|
.withS3Client(s3Client)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// TransferManager processes all transfers asynchronously,
|
||||||
|
// so this call returns immediately.
|
||||||
|
Upload upload = tm.upload(bucketName, keyName, file);
|
||||||
|
System.out.println("Object upload started");
|
||||||
|
|
||||||
|
// Optionally, wait for the upload to finish before continuing.
|
||||||
|
upload.waitForCompletion();
|
||||||
|
System.out.println("Object upload complete");
|
||||||
|
} catch (AmazonServiceException e) {
|
||||||
|
// The call was transmitted successfully, but Amazon S3 couldn't process
|
||||||
|
// it, so it returned an error response.
|
||||||
|
e.printStackTrace();
|
||||||
|
} catch (SdkClientException e) {
|
||||||
|
// Amazon S3 couldn't be contacted for a response, or the client
|
||||||
|
// couldn't parse the response from Amazon S3.
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -18,7 +18,7 @@ import java.io.File;
|
|||||||
/**
|
/**
|
||||||
* Hello world!
|
* Hello world!
|
||||||
*/
|
*/
|
||||||
public class S3Copy {
|
public class PutObject {
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
|
|
||||||
AWSCredentials credentials = new BasicAWSCredentials("ANY-ACCESSKEYID", "ANY-SECRETACCESSKEY");
|
AWSCredentials credentials = new BasicAWSCredentials("ANY-ACCESSKEYID", "ANY-SECRETACCESSKEY");
|
||||||
@@ -7,7 +7,7 @@ import junit.framework.TestSuite;
|
|||||||
/**
|
/**
|
||||||
* Unit test for simple App.
|
* Unit test for simple App.
|
||||||
*/
|
*/
|
||||||
public class S3CopyTest
|
public class PutObjectTest
|
||||||
extends TestCase
|
extends TestCase
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
@@ -15,7 +15,7 @@ public class S3CopyTest
|
|||||||
*
|
*
|
||||||
* @param testName name of the test case
|
* @param testName name of the test case
|
||||||
*/
|
*/
|
||||||
public S3CopyTest(String testName )
|
public PutObjectTest(String testName )
|
||||||
{
|
{
|
||||||
super( testName );
|
super( testName );
|
||||||
}
|
}
|
||||||
@@ -25,7 +25,7 @@ public class S3CopyTest
|
|||||||
*/
|
*/
|
||||||
public static Test suite()
|
public static Test suite()
|
||||||
{
|
{
|
||||||
return new TestSuite( S3CopyTest.class );
|
return new TestSuite( PutObjectTest.class );
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -15,7 +15,11 @@ import (
|
|||||||
"github.com/satori/go.uuid"
|
"github.com/satori/go.uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (s3a *S3ApiServer) createMultipartUpload(input *s3.CreateMultipartUploadInput) (output *s3.CreateMultipartUploadOutput, code ErrorCode) {
|
type InitiateMultipartUploadResult struct {
|
||||||
|
s3.CreateMultipartUploadOutput
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s3a *S3ApiServer) createMultipartUpload(input *s3.CreateMultipartUploadInput) (output *InitiateMultipartUploadResult, code ErrorCode) {
|
||||||
uploadId, _ := uuid.NewV4()
|
uploadId, _ := uuid.NewV4()
|
||||||
uploadIdString := uploadId.String()
|
uploadIdString := uploadId.String()
|
||||||
|
|
||||||
@@ -29,16 +33,22 @@ func (s3a *S3ApiServer) createMultipartUpload(input *s3.CreateMultipartUploadInp
|
|||||||
return nil, ErrInternalError
|
return nil, ErrInternalError
|
||||||
}
|
}
|
||||||
|
|
||||||
output = &s3.CreateMultipartUploadOutput{
|
output = &InitiateMultipartUploadResult{
|
||||||
Bucket: input.Bucket,
|
s3.CreateMultipartUploadOutput{
|
||||||
Key: input.Key,
|
Bucket: input.Bucket,
|
||||||
UploadId: aws.String(uploadIdString),
|
Key: input.Key,
|
||||||
|
UploadId: aws.String(uploadIdString),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploadInput) (output *s3.CompleteMultipartUploadOutput, code ErrorCode) {
|
type CompleteMultipartUploadResult struct {
|
||||||
|
s3.CompleteMultipartUploadOutput
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploadInput) (output *CompleteMultipartUploadResult, code ErrorCode) {
|
||||||
|
|
||||||
uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId
|
uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId
|
||||||
|
|
||||||
@@ -54,13 +64,14 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
|
|||||||
for _, entry := range entries {
|
for _, entry := range entries {
|
||||||
if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory {
|
if strings.HasSuffix(entry.Name, ".part") && !entry.IsDirectory {
|
||||||
for _, chunk := range entry.Chunks {
|
for _, chunk := range entry.Chunks {
|
||||||
finalParts = append(finalParts, &filer_pb.FileChunk{
|
p := &filer_pb.FileChunk{
|
||||||
FileId: chunk.FileId,
|
FileId: chunk.FileId,
|
||||||
Offset: offset,
|
Offset: offset,
|
||||||
Size: chunk.Size,
|
Size: chunk.Size,
|
||||||
Mtime: chunk.Mtime,
|
Mtime: chunk.Mtime,
|
||||||
ETag: chunk.ETag,
|
ETag: chunk.ETag,
|
||||||
})
|
}
|
||||||
|
finalParts = append(finalParts, p)
|
||||||
offset += int64(chunk.Size)
|
offset += int64(chunk.Size)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -71,6 +82,9 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
|
|||||||
if dirName == "." {
|
if dirName == "." {
|
||||||
dirName = ""
|
dirName = ""
|
||||||
}
|
}
|
||||||
|
if strings.HasPrefix(dirName, "/") {
|
||||||
|
dirName = dirName[1:]
|
||||||
|
}
|
||||||
dirName = fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, *input.Bucket, dirName)
|
dirName = fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, *input.Bucket, dirName)
|
||||||
|
|
||||||
err = s3a.mkFile(dirName, entryName, finalParts)
|
err = s3a.mkFile(dirName, entryName, finalParts)
|
||||||
@@ -80,10 +94,12 @@ func (s3a *S3ApiServer) completeMultipartUpload(input *s3.CompleteMultipartUploa
|
|||||||
return nil, ErrInternalError
|
return nil, ErrInternalError
|
||||||
}
|
}
|
||||||
|
|
||||||
output = &s3.CompleteMultipartUploadOutput{
|
output = &CompleteMultipartUploadResult{
|
||||||
Bucket: input.Bucket,
|
s3.CompleteMultipartUploadOutput{
|
||||||
ETag: aws.String("\"" + filer2.ETag(finalParts) + "\""),
|
Bucket: input.Bucket,
|
||||||
Key: input.Key,
|
ETag: aws.String("\"" + filer2.ETag(finalParts) + "\""),
|
||||||
|
Key: input.Key,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
@@ -107,15 +123,21 @@ func (s3a *S3ApiServer) abortMultipartUpload(input *s3.AbortMultipartUploadInput
|
|||||||
return &s3.AbortMultipartUploadOutput{}, ErrNone
|
return &s3.AbortMultipartUploadOutput{}, ErrNone
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s3a *S3ApiServer) listMultipartUploads(input *s3.ListMultipartUploadsInput) (output *s3.ListMultipartUploadsOutput, code ErrorCode) {
|
type ListMultipartUploadsResult struct {
|
||||||
|
s3.ListMultipartUploadsOutput
|
||||||
|
}
|
||||||
|
|
||||||
output = &s3.ListMultipartUploadsOutput{
|
func (s3a *S3ApiServer) listMultipartUploads(input *s3.ListMultipartUploadsInput) (output *ListMultipartUploadsResult, code ErrorCode) {
|
||||||
Bucket: input.Bucket,
|
|
||||||
Delimiter: input.Delimiter,
|
output = &ListMultipartUploadsResult{
|
||||||
EncodingType: input.EncodingType,
|
s3.ListMultipartUploadsOutput{
|
||||||
KeyMarker: input.KeyMarker,
|
Bucket: input.Bucket,
|
||||||
MaxUploads: input.MaxUploads,
|
Delimiter: input.Delimiter,
|
||||||
Prefix: input.Prefix,
|
EncodingType: input.EncodingType,
|
||||||
|
KeyMarker: input.KeyMarker,
|
||||||
|
MaxUploads: input.MaxUploads,
|
||||||
|
Prefix: input.Prefix,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
entries, err := s3a.list(s3a.genUploadsFolder(*input.Bucket), *input.Prefix, *input.KeyMarker, true, int(*input.MaxUploads))
|
entries, err := s3a.list(s3a.genUploadsFolder(*input.Bucket), *input.Prefix, *input.KeyMarker, true, int(*input.MaxUploads))
|
||||||
@@ -136,13 +158,19 @@ func (s3a *S3ApiServer) listMultipartUploads(input *s3.ListMultipartUploadsInput
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s3a *S3ApiServer) listObjectParts(input *s3.ListPartsInput) (output *s3.ListPartsOutput, code ErrorCode) {
|
type ListPartsResult struct {
|
||||||
output = &s3.ListPartsOutput{
|
s3.ListPartsOutput
|
||||||
Bucket: input.Bucket,
|
}
|
||||||
Key: input.Key,
|
|
||||||
UploadId: input.UploadId,
|
func (s3a *S3ApiServer) listObjectParts(input *s3.ListPartsInput) (output *ListPartsResult, code ErrorCode) {
|
||||||
MaxParts: input.MaxParts, // the maximum number of parts to return.
|
output = &ListPartsResult{
|
||||||
PartNumberMarker: input.PartNumberMarker, // the part number starts after this, exclusive
|
s3.ListPartsOutput{
|
||||||
|
Bucket: input.Bucket,
|
||||||
|
Key: input.Key,
|
||||||
|
UploadId: input.UploadId,
|
||||||
|
MaxParts: input.MaxParts, // the maximum number of parts to return.
|
||||||
|
PartNumberMarker: input.PartNumberMarker, // the part number starts after this, exclusive
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
entries, err := s3a.list(s3a.genUploadsFolder(*input.Bucket)+"/"+*input.UploadId,
|
entries, err := s3a.list(s3a.genUploadsFolder(*input.Bucket)+"/"+*input.UploadId,
|
||||||
|
|||||||
@@ -78,7 +78,7 @@ func (s3a *S3ApiServer) list(parentDirectoryPath, prefix, startFrom string, incl
|
|||||||
err = s3a.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
err = s3a.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
|
||||||
|
|
||||||
request := &filer_pb.ListEntriesRequest{
|
request := &filer_pb.ListEntriesRequest{
|
||||||
Directory: s3a.option.BucketsPath,
|
Directory: parentDirectoryPath,
|
||||||
Prefix: prefix,
|
Prefix: prefix,
|
||||||
StartFromFileName: startFrom,
|
StartFromFileName: startFrom,
|
||||||
InclusiveStartFrom: inclusive,
|
InclusiveStartFrom: inclusive,
|
||||||
@@ -135,7 +135,7 @@ func (s3a *S3ApiServer) exists(parentDirectoryPath string, entryName string, isD
|
|||||||
Name: entryName,
|
Name: entryName,
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(1).Infof("exists entry %v/%v: %v", parentDirectoryPath, entryName, request)
|
glog.V(4).Infof("exists entry %v/%v: %v", parentDirectoryPath, entryName, request)
|
||||||
resp, err := client.LookupDirectoryEntry(ctx, request)
|
resp, err := client.LookupDirectoryEntry(ctx, request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("exists entry %s/%s: %v", parentDirectoryPath, entryName, err)
|
return fmt.Errorf("exists entry %s/%s: %v", parentDirectoryPath, entryName, err)
|
||||||
|
|||||||
@@ -40,9 +40,7 @@ const (
|
|||||||
ErrInvalidMaxParts
|
ErrInvalidMaxParts
|
||||||
ErrInvalidPartNumberMarker
|
ErrInvalidPartNumberMarker
|
||||||
ErrInvalidPart
|
ErrInvalidPart
|
||||||
ErrInvalidPartOrder
|
|
||||||
ErrInternalError
|
ErrInternalError
|
||||||
ErrMalformedXML
|
|
||||||
ErrNotImplemented
|
ErrNotImplemented
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -114,21 +112,12 @@ var errorCodeResponse = map[ErrorCode]APIError{
|
|||||||
Description: "We encountered an internal error, please try again.",
|
Description: "We encountered an internal error, please try again.",
|
||||||
HTTPStatusCode: http.StatusInternalServerError,
|
HTTPStatusCode: http.StatusInternalServerError,
|
||||||
},
|
},
|
||||||
ErrMalformedXML: {
|
|
||||||
Code: "MalformedXML",
|
|
||||||
Description: "The XML you provided was not well-formed or did not validate against our published schema.",
|
|
||||||
HTTPStatusCode: http.StatusBadRequest,
|
|
||||||
},
|
|
||||||
ErrInvalidPart: {
|
ErrInvalidPart: {
|
||||||
Code: "InvalidPart",
|
Code: "InvalidPart",
|
||||||
Description: "One or more of the specified parts could not be found. The part may not have been uploaded, or the specified entity tag may not match the part's entity tag.",
|
Description: "One or more of the specified parts could not be found. The part may not have been uploaded, or the specified entity tag may not match the part's entity tag.",
|
||||||
HTTPStatusCode: http.StatusBadRequest,
|
HTTPStatusCode: http.StatusBadRequest,
|
||||||
},
|
},
|
||||||
ErrInvalidPartOrder: {
|
|
||||||
Code: "InvalidPartOrder",
|
|
||||||
Description: "The list of parts was not in ascending order. The parts list must be specified in order by part number.",
|
|
||||||
HTTPStatusCode: http.StatusBadRequest,
|
|
||||||
},
|
|
||||||
ErrNotImplemented: {
|
ErrNotImplemented: {
|
||||||
Code: "NotImplemented",
|
Code: "NotImplemented",
|
||||||
Description: "A header you provided implies functionality that is not implemented",
|
Description: "A header you provided implies functionality that is not implemented",
|
||||||
|
|||||||
@@ -3,12 +3,15 @@ package s3api
|
|||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
|
||||||
"github.com/gorilla/mux"
|
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
|
"github.com/gorilla/mux"
|
||||||
|
"github.com/chrislusf/seaweedfs/weed/server"
|
||||||
|
"crypto/md5"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@@ -21,19 +24,13 @@ func init() {
|
|||||||
}}
|
}}
|
||||||
}
|
}
|
||||||
|
|
||||||
type UploadResult struct {
|
|
||||||
Name string `json:"name,omitempty"`
|
|
||||||
Size uint32 `json:"size,omitempty"`
|
|
||||||
Error string `json:"error,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
// http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
|
// http://docs.aws.amazon.com/AmazonS3/latest/dev/UploadingObjects.html
|
||||||
|
|
||||||
vars := mux.Vars(r)
|
vars := mux.Vars(r)
|
||||||
bucket := vars["bucket"]
|
bucket := vars["bucket"]
|
||||||
object := vars["object"]
|
object := getObject(vars)
|
||||||
|
|
||||||
_, err := validateContentMd5(r.Header)
|
_, err := validateContentMd5(r.Header)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -47,7 +44,7 @@ func (s3a *S3ApiServer) PutObjectHandler(w http.ResponseWriter, r *http.Request)
|
|||||||
dataReader = newSignV4ChunkedReader(r)
|
dataReader = newSignV4ChunkedReader(r)
|
||||||
}
|
}
|
||||||
|
|
||||||
uploadUrl := fmt.Sprintf("http://%s%s/%s/%s?collection=%s",
|
uploadUrl := fmt.Sprintf("http://%s%s/%s%s?collection=%s",
|
||||||
s3a.option.Filer, s3a.option.BucketsPath, bucket, object, bucket)
|
s3a.option.Filer, s3a.option.BucketsPath, bucket, object, bucket)
|
||||||
|
|
||||||
etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader)
|
etag, errCode := s3a.putToFiler(r, uploadUrl, dataReader)
|
||||||
@@ -147,7 +144,10 @@ func passThroghResponse(proxyResonse *http.Response, w http.ResponseWriter) {
|
|||||||
|
|
||||||
func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.ReadCloser) (etag string, code ErrorCode) {
|
func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader io.ReadCloser) (etag string, code ErrorCode) {
|
||||||
|
|
||||||
proxyReq, err := http.NewRequest("PUT", uploadUrl, dataReader)
|
hash := md5.New()
|
||||||
|
var body io.Reader = io.TeeReader(dataReader, hash)
|
||||||
|
|
||||||
|
proxyReq, err := http.NewRequest("PUT", uploadUrl, body)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("NewRequest %s: %v", uploadUrl, err)
|
glog.Errorf("NewRequest %s: %v", uploadUrl, err)
|
||||||
@@ -165,28 +165,30 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
|
|||||||
|
|
||||||
resp, postErr := client.Do(proxyReq)
|
resp, postErr := client.Do(proxyReq)
|
||||||
|
|
||||||
|
dataReader.Close()
|
||||||
|
|
||||||
if postErr != nil {
|
if postErr != nil {
|
||||||
glog.Errorf("post to filer: %v", postErr)
|
glog.Errorf("post to filer: %v", postErr)
|
||||||
return "", ErrInternalError
|
return "", ErrInternalError
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
etag = resp.Header.Get("ETag")
|
etag = fmt.Sprintf("%x", hash.Sum(nil))
|
||||||
|
|
||||||
resp_body, ra_err := ioutil.ReadAll(resp.Body)
|
resp_body, ra_err := ioutil.ReadAll(resp.Body)
|
||||||
if ra_err != nil {
|
if ra_err != nil {
|
||||||
glog.Errorf("upload to filer response read: %v", ra_err)
|
glog.Errorf("upload to filer response read: %v", ra_err)
|
||||||
return etag, ErrInternalError
|
return etag, ErrInternalError
|
||||||
}
|
}
|
||||||
var ret UploadResult
|
var ret weed_server.FilerPostResult
|
||||||
unmarshal_err := json.Unmarshal(resp_body, &ret)
|
unmarshal_err := json.Unmarshal(resp_body, &ret)
|
||||||
if unmarshal_err != nil {
|
if unmarshal_err != nil {
|
||||||
glog.Errorf("failing to read upload to %s : %v", uploadUrl, string(resp_body))
|
glog.Errorf("failing to read upload to %s : %v", uploadUrl, string(resp_body))
|
||||||
return etag, ErrInternalError
|
return "", ErrInternalError
|
||||||
}
|
}
|
||||||
if ret.Error != "" {
|
if ret.Error != "" {
|
||||||
glog.Errorf("upload to filer error: %v", ret.Error)
|
glog.Errorf("upload to filer error: %v", ret.Error)
|
||||||
return etag, ErrInternalError
|
return "", ErrInternalError
|
||||||
}
|
}
|
||||||
|
|
||||||
return etag, ErrNone
|
return etag, ErrNone
|
||||||
@@ -194,6 +196,26 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader
|
|||||||
|
|
||||||
func setEtag(w http.ResponseWriter, etag string) {
|
func setEtag(w http.ResponseWriter, etag string) {
|
||||||
if etag != "" {
|
if etag != "" {
|
||||||
w.Header().Set("ETag", "\""+etag+"\"")
|
if strings.HasPrefix(etag, "\"") {
|
||||||
|
w.Header().Set("ETag", etag)
|
||||||
|
} else {
|
||||||
|
w.Header().Set("ETag", "\""+etag+"\"")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getObject(vars map[string]string) string {
|
||||||
|
object := vars["object"]
|
||||||
|
if !strings.HasPrefix(object, "/") {
|
||||||
|
object = "/" + object
|
||||||
|
}
|
||||||
|
return object
|
||||||
|
}
|
||||||
|
|
||||||
|
func getEtag(r *http.Request) (etag string) {
|
||||||
|
etag = r.Header.Get("ETag")
|
||||||
|
if strings.HasPrefix(etag, "\"") && strings.HasSuffix(etag, "\"") {
|
||||||
|
etag = etag[1 : len(etag)-1]
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,15 +1,12 @@
|
|||||||
package s3api
|
package s3api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/xml"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/aws/aws-sdk-go/aws"
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
"github.com/aws/aws-sdk-go/service/s3"
|
"github.com/aws/aws-sdk-go/service/s3"
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
"io/ioutil"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"sort"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
@@ -38,6 +35,8 @@ func (s3a *S3ApiServer) NewMultipartUploadHandler(w http.ResponseWriter, r *http
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// println("NewMultipartUploadHandler", string(encodeResponse(response)))
|
||||||
|
|
||||||
writeSuccessResponseXML(w, encodeResponse(response))
|
writeSuccessResponseXML(w, encodeResponse(response))
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -46,37 +45,19 @@ func (s3a *S3ApiServer) NewMultipartUploadHandler(w http.ResponseWriter, r *http
|
|||||||
func (s3a *S3ApiServer) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
|
func (s3a *S3ApiServer) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
vars := mux.Vars(r)
|
vars := mux.Vars(r)
|
||||||
bucket := vars["bucket"]
|
bucket := vars["bucket"]
|
||||||
object := vars["object"]
|
object := getObject(vars)
|
||||||
|
|
||||||
// Get upload id.
|
// Get upload id.
|
||||||
uploadID, _, _, _ := getObjectResources(r.URL.Query())
|
uploadID, _, _, _ := getObjectResources(r.URL.Query())
|
||||||
|
|
||||||
completeMultipartBytes, err := ioutil.ReadAll(r.Body)
|
|
||||||
if err != nil {
|
|
||||||
writeErrorResponse(w, ErrInternalError, r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
completedMultipartUpload := &s3.CompletedMultipartUpload{}
|
|
||||||
if err = xml.Unmarshal(completeMultipartBytes, completedMultipartUpload); err != nil {
|
|
||||||
writeErrorResponse(w, ErrMalformedXML, r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if len(completedMultipartUpload.Parts) == 0 {
|
|
||||||
writeErrorResponse(w, ErrMalformedXML, r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if !sort.IsSorted(byCompletedPartNumber(completedMultipartUpload.Parts)) {
|
|
||||||
writeErrorResponse(w, ErrInvalidPartOrder, r.URL)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
response, errCode := s3a.completeMultipartUpload(&s3.CompleteMultipartUploadInput{
|
response, errCode := s3a.completeMultipartUpload(&s3.CompleteMultipartUploadInput{
|
||||||
Bucket: aws.String(bucket),
|
Bucket: aws.String(bucket),
|
||||||
Key: aws.String(object),
|
Key: aws.String(object),
|
||||||
MultipartUpload: completedMultipartUpload,
|
UploadId: aws.String(uploadID),
|
||||||
UploadId: aws.String(uploadID),
|
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// println("CompleteMultipartUploadHandler", string(encodeResponse(response)), errCode)
|
||||||
|
|
||||||
if errCode != ErrNone {
|
if errCode != ErrNone {
|
||||||
writeErrorResponse(w, errCode, r.URL)
|
writeErrorResponse(w, errCode, r.URL)
|
||||||
return
|
return
|
||||||
@@ -90,7 +71,7 @@ func (s3a *S3ApiServer) CompleteMultipartUploadHandler(w http.ResponseWriter, r
|
|||||||
func (s3a *S3ApiServer) AbortMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
|
func (s3a *S3ApiServer) AbortMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
vars := mux.Vars(r)
|
vars := mux.Vars(r)
|
||||||
bucket := vars["bucket"]
|
bucket := vars["bucket"]
|
||||||
object := vars["object"]
|
object := getObject(vars)
|
||||||
|
|
||||||
// Get upload id.
|
// Get upload id.
|
||||||
uploadID, _, _, _ := getObjectResources(r.URL.Query())
|
uploadID, _, _, _ := getObjectResources(r.URL.Query())
|
||||||
@@ -106,6 +87,8 @@ func (s3a *S3ApiServer) AbortMultipartUploadHandler(w http.ResponseWriter, r *ht
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// println("AbortMultipartUploadHandler", string(encodeResponse(response)))
|
||||||
|
|
||||||
writeSuccessResponseXML(w, encodeResponse(response))
|
writeSuccessResponseXML(w, encodeResponse(response))
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -144,6 +127,7 @@ func (s3a *S3ApiServer) ListMultipartUploadsHandler(w http.ResponseWriter, r *ht
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TODO handle encodingType
|
// TODO handle encodingType
|
||||||
|
// println("ListMultipartUploadsHandler", string(encodeResponse(response)))
|
||||||
|
|
||||||
writeSuccessResponseXML(w, encodeResponse(response))
|
writeSuccessResponseXML(w, encodeResponse(response))
|
||||||
}
|
}
|
||||||
@@ -152,7 +136,7 @@ func (s3a *S3ApiServer) ListMultipartUploadsHandler(w http.ResponseWriter, r *ht
|
|||||||
func (s3a *S3ApiServer) ListObjectPartsHandler(w http.ResponseWriter, r *http.Request) {
|
func (s3a *S3ApiServer) ListObjectPartsHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
vars := mux.Vars(r)
|
vars := mux.Vars(r)
|
||||||
bucket := vars["bucket"]
|
bucket := vars["bucket"]
|
||||||
object := vars["object"]
|
object := getObject(vars)
|
||||||
|
|
||||||
uploadID, partNumberMarker, maxParts, _ := getObjectResources(r.URL.Query())
|
uploadID, partNumberMarker, maxParts, _ := getObjectResources(r.URL.Query())
|
||||||
if partNumberMarker < 0 {
|
if partNumberMarker < 0 {
|
||||||
@@ -177,6 +161,8 @@ func (s3a *S3ApiServer) ListObjectPartsHandler(w http.ResponseWriter, r *http.Re
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// println("ListObjectPartsHandler", string(encodeResponse(response)))
|
||||||
|
|
||||||
writeSuccessResponseXML(w, encodeResponse(response))
|
writeSuccessResponseXML(w, encodeResponse(response))
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -43,11 +43,6 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
|
|||||||
|
|
||||||
for _, bucket := range routers {
|
for _, bucket := range routers {
|
||||||
|
|
||||||
// PutObject
|
|
||||||
bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(s3a.PutObjectHandler)
|
|
||||||
// PutBucket
|
|
||||||
bucket.Methods("PUT").HandlerFunc(s3a.PutBucketHandler)
|
|
||||||
|
|
||||||
// HeadObject
|
// HeadObject
|
||||||
bucket.Methods("HEAD").Path("/{object:.+}").HandlerFunc(s3a.HeadObjectHandler)
|
bucket.Methods("HEAD").Path("/{object:.+}").HandlerFunc(s3a.HeadObjectHandler)
|
||||||
// HeadBucket
|
// HeadBucket
|
||||||
@@ -66,6 +61,11 @@ func (s3a *S3ApiServer) registerRouter(router *mux.Router) {
|
|||||||
// ListMultipartUploads
|
// ListMultipartUploads
|
||||||
bucket.Methods("GET").HandlerFunc(s3a.ListMultipartUploadsHandler).Queries("uploads", "")
|
bucket.Methods("GET").HandlerFunc(s3a.ListMultipartUploadsHandler).Queries("uploads", "")
|
||||||
|
|
||||||
|
// PutObject
|
||||||
|
bucket.Methods("PUT").Path("/{object:.+}").HandlerFunc(s3a.PutObjectHandler)
|
||||||
|
// PutBucket
|
||||||
|
bucket.Methods("PUT").HandlerFunc(s3a.PutBucketHandler)
|
||||||
|
|
||||||
// DeleteObject
|
// DeleteObject
|
||||||
bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(s3a.DeleteObjectHandler)
|
bucket.Methods("DELETE").Path("/{object:.+}").HandlerFunc(s3a.DeleteObjectHandler)
|
||||||
// DeleteBucket
|
// DeleteBucket
|
||||||
|
|||||||
@@ -117,7 +117,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(0).Infof("request header %+v, urlLocation: %v", r.Header, urlLocation)
|
glog.V(4).Infof("write %s to %v", r.URL.Path, urlLocation)
|
||||||
|
|
||||||
u, _ := url.Parse(urlLocation)
|
u, _ := url.Parse(urlLocation)
|
||||||
|
|
||||||
@@ -221,6 +221,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
Fid: fileId,
|
Fid: fileId,
|
||||||
Url: urlLocation,
|
Url: urlLocation,
|
||||||
}
|
}
|
||||||
|
setEtag(w, etag)
|
||||||
writeJsonQuiet(w, r, http.StatusCreated, reply)
|
writeJsonQuiet(w, r, http.StatusCreated, reply)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -4,13 +4,14 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/chrislusf/seaweedfs/weed/glog"
|
"github.com/chrislusf/seaweedfs/weed/glog"
|
||||||
"github.com/chrislusf/seaweedfs/weed/operation"
|
"github.com/chrislusf/seaweedfs/weed/operation"
|
||||||
"github.com/chrislusf/seaweedfs/weed/storage"
|
"github.com/chrislusf/seaweedfs/weed/storage"
|
||||||
"github.com/chrislusf/seaweedfs/weed/topology"
|
"github.com/chrislusf/seaweedfs/weed/topology"
|
||||||
"strconv"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
|
func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
@@ -175,6 +176,10 @@ func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Reques
|
|||||||
|
|
||||||
func setEtag(w http.ResponseWriter, etag string) {
|
func setEtag(w http.ResponseWriter, etag string) {
|
||||||
if etag != "" {
|
if etag != "" {
|
||||||
w.Header().Set("ETag", "\""+etag+"\"")
|
if strings.HasPrefix(etag, "\"") {
|
||||||
|
w.Header().Set("ETag", etag)
|
||||||
|
} else {
|
||||||
|
w.Header().Set("ETag", "\""+etag+"\"")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user