* Add S3 volume encryption support with -s3.encryptVolumeData flag This change adds volume-level encryption support for S3 uploads, similar to the existing -filer.encryptVolumeData option. Each chunk is encrypted with its own auto-generated CipherKey when the flag is enabled. Changes: - Add -s3.encryptVolumeData flag to weed s3, weed server, and weed mini - Wire Cipher option through S3ApiServer and ChunkedUploadOption - Add integration tests for multi-chunk range reads with encryption - Tests verify encryption works across chunk boundaries Usage: weed s3 -encryptVolumeData weed server -s3 -s3.encryptVolumeData weed mini -s3.encryptVolumeData Integration tests: go test -v -tags=integration -timeout 5m ./test/s3/sse/... * Add GitHub Actions CI for S3 volume encryption tests - Add test-volume-encryption target to Makefile that starts server with -s3.encryptVolumeData - Add s3-volume-encryption job to GitHub Actions workflow - Tests run with integration build tag and 10m timeout - Server logs uploaded on failure for debugging * Fix S3 client credentials to use environment variables The test was using hardcoded credentials "any"/"any" but the Makefile sets AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY to "some_access_key1"/ "some_secret_key1". Updated getS3Client() to read from environment variables with fallback to "any"/"any" for manual testing. * Change bucket creation errors from skip to fatal Tests should fail, not skip, when bucket creation fails. This ensures that credential mismatches and other configuration issues are caught rather than silently skipped. * Make copy and multipart test jobs fail instead of succeed Changed exit 0 to exit 1 for s3-sse-copy-operations and s3-sse-multipart jobs. These jobs document known limitations but should fail to ensure the issues are tracked and addressed, not silently ignored. * Hardcode S3 credentials to match Makefile Changed from environment variables to hardcoded credentials "some_access_key1"/"some_secret_key1" to match the Makefile configuration. This ensures tests work reliably. * fix Double Encryption * fix Chunk Size Mismatch * Added IsCompressed * is gzipped * fix copying * only perform HEAD request when len(cipherKey) > 0 * Revert "Make copy and multipart test jobs fail instead of succeed" This reverts commit bc34a7eb3c103ae7ab2000da2a6c3925712eb226. * fix security vulnerability * fix security * Update s3api_object_handlers_copy.go * Update s3api_object_handlers_copy.go * jwt to get content length
440 lines
13 KiB
Go
440 lines
13 KiB
Go
package operation
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"mime"
|
|
"mime/multipart"
|
|
"net/http"
|
|
"net/textproto"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/util/request_id"
|
|
"github.com/valyala/bytebufferpool"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/security"
|
|
"github.com/seaweedfs/seaweedfs/weed/stats"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
|
|
util_http_client "github.com/seaweedfs/seaweedfs/weed/util/http/client"
|
|
)
|
|
|
|
type UploadOption struct {
|
|
UploadUrl string
|
|
Filename string
|
|
Cipher bool
|
|
IsInputCompressed bool
|
|
MimeType string
|
|
PairMap map[string]string
|
|
Jwt security.EncodedJwt
|
|
RetryForever bool
|
|
Md5 string
|
|
BytesBuffer *bytes.Buffer
|
|
}
|
|
|
|
type UploadResult struct {
|
|
Name string `json:"name,omitempty"`
|
|
Size uint32 `json:"size,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
ETag string `json:"eTag,omitempty"`
|
|
CipherKey []byte `json:"cipherKey,omitempty"`
|
|
Mime string `json:"mime,omitempty"`
|
|
Gzip uint32 `json:"gzip,omitempty"`
|
|
ContentMd5 string `json:"contentMd5,omitempty"`
|
|
RetryCount int `json:"-"`
|
|
}
|
|
|
|
func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64, tsNs int64) *filer_pb.FileChunk {
|
|
fid, _ := filer_pb.ToFileIdObject(fileId)
|
|
return &filer_pb.FileChunk{
|
|
FileId: fileId,
|
|
Offset: offset,
|
|
Size: uint64(uploadResult.Size),
|
|
ModifiedTsNs: tsNs,
|
|
ETag: uploadResult.ContentMd5,
|
|
CipherKey: uploadResult.CipherKey,
|
|
IsCompressed: uploadResult.Gzip > 0,
|
|
Fid: fid,
|
|
}
|
|
}
|
|
|
|
// ToPbFileChunkWithSSE creates a FileChunk with SSE metadata
|
|
func (uploadResult *UploadResult) ToPbFileChunkWithSSE(fileId string, offset int64, tsNs int64, sseType filer_pb.SSEType, sseMetadata []byte) *filer_pb.FileChunk {
|
|
fid, _ := filer_pb.ToFileIdObject(fileId)
|
|
chunk := &filer_pb.FileChunk{
|
|
FileId: fileId,
|
|
Offset: offset,
|
|
Size: uint64(uploadResult.Size),
|
|
ModifiedTsNs: tsNs,
|
|
ETag: uploadResult.ContentMd5,
|
|
CipherKey: uploadResult.CipherKey,
|
|
IsCompressed: uploadResult.Gzip > 0,
|
|
Fid: fid,
|
|
}
|
|
|
|
// Add SSE metadata if provided
|
|
chunk.SseType = sseType
|
|
if len(sseMetadata) > 0 {
|
|
chunk.SseMetadata = sseMetadata
|
|
}
|
|
|
|
return chunk
|
|
}
|
|
|
|
var (
|
|
uploader *Uploader
|
|
uploaderErr error
|
|
once sync.Once
|
|
)
|
|
|
|
// HTTPClient interface for testing
|
|
type HTTPClient interface {
|
|
Do(req *http.Request) (*http.Response, error)
|
|
}
|
|
|
|
// Uploader
|
|
type Uploader struct {
|
|
httpClient HTTPClient
|
|
}
|
|
|
|
func NewUploader() (*Uploader, error) {
|
|
once.Do(func() {
|
|
// With Dial context
|
|
var httpClient *util_http_client.HTTPClient
|
|
httpClient, uploaderErr = util_http.NewGlobalHttpClient(util_http_client.AddDialContext)
|
|
if uploaderErr != nil {
|
|
uploaderErr = fmt.Errorf("error initializing the loader: %s", uploaderErr)
|
|
}
|
|
if httpClient != nil {
|
|
uploader = newUploader(httpClient)
|
|
}
|
|
})
|
|
return uploader, uploaderErr
|
|
}
|
|
|
|
func newUploader(httpClient HTTPClient) *Uploader {
|
|
return &Uploader{
|
|
httpClient: httpClient,
|
|
}
|
|
}
|
|
|
|
// UploadWithRetry will retry both assigning volume request and uploading content
|
|
// The option parameter does not need to specify UploadUrl and Jwt, which will come from assigning volume.
|
|
func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.AssignVolumeRequest, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (fileId string, uploadResult *UploadResult, err error, data []byte) {
|
|
doUploadFunc := func() error {
|
|
|
|
var host string
|
|
var auth security.EncodedJwt
|
|
|
|
// grpc assign volume
|
|
if grpcAssignErr := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
resp, assignErr := client.AssignVolume(context.Background(), assignRequest)
|
|
if assignErr != nil {
|
|
glog.V(0).Infof("assign volume failure %v: %v", assignRequest, assignErr)
|
|
return assignErr
|
|
}
|
|
if resp.Error != "" {
|
|
return fmt.Errorf("assign volume failure %v: %v", assignRequest, resp.Error)
|
|
}
|
|
|
|
fileId, auth = resp.FileId, security.EncodedJwt(resp.Auth)
|
|
loc := resp.Location
|
|
host = filerClient.AdjustedUrl(loc)
|
|
|
|
return nil
|
|
}); grpcAssignErr != nil {
|
|
return fmt.Errorf("filerGrpcAddress assign volume: %w", grpcAssignErr)
|
|
}
|
|
|
|
uploadOption.UploadUrl = genFileUrlFn(host, fileId)
|
|
uploadOption.Jwt = auth
|
|
|
|
var uploadErr error
|
|
uploadResult, uploadErr, data = uploader.doUpload(context.Background(), reader, uploadOption)
|
|
return uploadErr
|
|
}
|
|
if uploadOption.RetryForever {
|
|
util.RetryUntil("uploadWithRetryForever", doUploadFunc, func(err error) (shouldContinue bool) {
|
|
glog.V(0).Infof("upload content: %v", err)
|
|
return true
|
|
})
|
|
} else {
|
|
uploadErrList := []string{"transport", "is read only"}
|
|
err = util.MultiRetry("uploadWithRetry", uploadErrList, doUploadFunc)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// Upload sends a POST request to a volume server to upload the content with adjustable compression level
|
|
func (uploader *Uploader) UploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
|
|
uploadResult, err = uploader.retriedUploadData(ctx, data, option)
|
|
return
|
|
}
|
|
|
|
// Upload sends a POST request to a volume server to upload the content with fast compression
|
|
func (uploader *Uploader) Upload(ctx context.Context, reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
|
|
uploadResult, err, data = uploader.doUpload(ctx, reader, option)
|
|
return
|
|
}
|
|
|
|
func (uploader *Uploader) doUpload(ctx context.Context, reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
|
|
bytesReader, ok := reader.(*util.BytesReader)
|
|
if ok {
|
|
data = bytesReader.Bytes
|
|
} else {
|
|
data, err = io.ReadAll(reader)
|
|
if err != nil {
|
|
err = fmt.Errorf("read input: %w", err)
|
|
return
|
|
}
|
|
}
|
|
uploadResult, uploadErr := uploader.retriedUploadData(ctx, data, option)
|
|
return uploadResult, uploadErr, data
|
|
}
|
|
|
|
func (uploader *Uploader) retriedUploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
|
|
for i := 0; i < 3; i++ {
|
|
if i > 0 {
|
|
time.Sleep(time.Millisecond * time.Duration(237*(i+1)))
|
|
}
|
|
uploadResult, err = uploader.doUploadData(ctx, data, option)
|
|
if err == nil {
|
|
uploadResult.RetryCount = i
|
|
return
|
|
}
|
|
glog.WarningfCtx(ctx, "uploading %d to %s: %v", i, option.UploadUrl, err)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (uploader *Uploader) doUploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
|
|
contentIsGzipped := option.IsInputCompressed
|
|
shouldGzipNow := false
|
|
if !option.IsInputCompressed {
|
|
if option.MimeType == "" {
|
|
option.MimeType = http.DetectContentType(data)
|
|
// println("detect1 mimetype to", MimeType)
|
|
if option.MimeType == "application/octet-stream" {
|
|
option.MimeType = ""
|
|
}
|
|
}
|
|
if shouldBeCompressed, iAmSure := util.IsCompressableFileType(filepath.Base(option.Filename), option.MimeType); iAmSure && shouldBeCompressed {
|
|
shouldGzipNow = true
|
|
} else if !iAmSure && option.MimeType == "" && len(data) > 16*1024 {
|
|
var compressed []byte
|
|
compressed, err = util.GzipData(data[0:128])
|
|
if err != nil {
|
|
return
|
|
}
|
|
shouldGzipNow = len(compressed)*10 < 128*9 // can not compress to less than 90%
|
|
}
|
|
}
|
|
|
|
var clearDataLen int
|
|
|
|
// gzip if possible
|
|
// this could be double copying
|
|
clearDataLen = len(data)
|
|
clearData := data
|
|
if shouldGzipNow {
|
|
compressed, compressErr := util.GzipData(data)
|
|
// fmt.Printf("data is compressed from %d ==> %d\n", len(data), len(compressed))
|
|
if compressErr == nil {
|
|
if len(compressed) < len(data) {
|
|
data = compressed
|
|
contentIsGzipped = true
|
|
}
|
|
}
|
|
} else if option.IsInputCompressed {
|
|
// just to get the clear data length
|
|
clearData, err = util.DecompressData(data)
|
|
if err == nil {
|
|
clearDataLen = len(clearData)
|
|
}
|
|
}
|
|
|
|
if option.Cipher {
|
|
// encrypt(gzip(data))
|
|
|
|
// encrypt
|
|
cipherKey := util.GenCipherKey()
|
|
encryptedData, encryptionErr := util.Encrypt(data, cipherKey)
|
|
if encryptionErr != nil {
|
|
err = fmt.Errorf("encrypt input: %w", encryptionErr)
|
|
return
|
|
}
|
|
|
|
// upload data
|
|
uploadResult, err = uploader.upload_content(ctx, func(w io.Writer) (err error) {
|
|
_, err = w.Write(encryptedData)
|
|
return
|
|
}, len(encryptedData), &UploadOption{
|
|
UploadUrl: option.UploadUrl,
|
|
Filename: "",
|
|
Cipher: false,
|
|
IsInputCompressed: false,
|
|
MimeType: "",
|
|
PairMap: nil,
|
|
Jwt: option.Jwt,
|
|
})
|
|
if uploadResult == nil {
|
|
return
|
|
}
|
|
uploadResult.Name = option.Filename
|
|
uploadResult.Mime = option.MimeType
|
|
uploadResult.CipherKey = cipherKey
|
|
uploadResult.Size = uint32(len(data))
|
|
if contentIsGzipped {
|
|
uploadResult.Gzip = 1
|
|
}
|
|
} else {
|
|
// upload data
|
|
uploadResult, err = uploader.upload_content(ctx, func(w io.Writer) (err error) {
|
|
_, err = w.Write(data)
|
|
return
|
|
}, len(data), &UploadOption{
|
|
UploadUrl: option.UploadUrl,
|
|
Filename: option.Filename,
|
|
Cipher: false,
|
|
IsInputCompressed: contentIsGzipped,
|
|
MimeType: option.MimeType,
|
|
PairMap: option.PairMap,
|
|
Jwt: option.Jwt,
|
|
Md5: option.Md5,
|
|
BytesBuffer: option.BytesBuffer,
|
|
})
|
|
if uploadResult == nil {
|
|
return
|
|
}
|
|
uploadResult.Size = uint32(clearDataLen)
|
|
if contentIsGzipped {
|
|
uploadResult.Gzip = 1
|
|
}
|
|
}
|
|
|
|
return uploadResult, err
|
|
}
|
|
|
|
func (uploader *Uploader) upload_content(ctx context.Context, fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) {
|
|
var body_writer *multipart.Writer
|
|
var reqReader *bytes.Reader
|
|
var buf *bytebufferpool.ByteBuffer
|
|
if option.BytesBuffer == nil {
|
|
buf = GetBuffer()
|
|
defer PutBuffer(buf)
|
|
body_writer = multipart.NewWriter(buf)
|
|
} else {
|
|
option.BytesBuffer.Reset()
|
|
body_writer = multipart.NewWriter(option.BytesBuffer)
|
|
}
|
|
h := make(textproto.MIMEHeader)
|
|
// Use mime.FormatMediaType for RFC 6266 compliant Content-Disposition,
|
|
// properly handling non-ASCII characters and special characters
|
|
h.Set("Content-Disposition", mime.FormatMediaType("form-data", map[string]string{"name": "file", "filename": option.Filename}))
|
|
h.Set("Idempotency-Key", option.UploadUrl)
|
|
if option.MimeType == "" {
|
|
option.MimeType = mime.TypeByExtension(strings.ToLower(filepath.Ext(option.Filename)))
|
|
}
|
|
if option.MimeType != "" {
|
|
h.Set("Content-Type", option.MimeType)
|
|
}
|
|
if option.IsInputCompressed {
|
|
h.Set("Content-Encoding", "gzip")
|
|
}
|
|
if option.Md5 != "" {
|
|
h.Set("Content-MD5", option.Md5)
|
|
}
|
|
|
|
file_writer, cp_err := body_writer.CreatePart(h)
|
|
if cp_err != nil {
|
|
glog.V(0).InfolnCtx(ctx, "error creating form file", cp_err.Error())
|
|
return nil, cp_err
|
|
}
|
|
if err := fillBufferFunction(file_writer); err != nil {
|
|
glog.V(0).InfolnCtx(ctx, "error copying data", err)
|
|
return nil, err
|
|
}
|
|
content_type := body_writer.FormDataContentType()
|
|
if err := body_writer.Close(); err != nil {
|
|
glog.V(0).InfolnCtx(ctx, "error closing body", err)
|
|
return nil, err
|
|
}
|
|
if option.BytesBuffer == nil {
|
|
reqReader = bytes.NewReader(buf.Bytes())
|
|
} else {
|
|
reqReader = bytes.NewReader(option.BytesBuffer.Bytes())
|
|
}
|
|
req, postErr := http.NewRequestWithContext(ctx, http.MethodPost, option.UploadUrl, reqReader)
|
|
if postErr != nil {
|
|
glog.V(1).InfofCtx(ctx, "create upload request %s: %v", option.UploadUrl, postErr)
|
|
return nil, fmt.Errorf("create upload request %s: %v", option.UploadUrl, postErr)
|
|
}
|
|
req.Header.Set("Content-Type", content_type)
|
|
for k, v := range option.PairMap {
|
|
req.Header.Set(k, v)
|
|
}
|
|
if option.Jwt != "" {
|
|
req.Header.Set("Authorization", "BEARER "+string(option.Jwt))
|
|
}
|
|
|
|
request_id.InjectToRequest(ctx, req)
|
|
|
|
// print("+")
|
|
resp, post_err := uploader.httpClient.Do(req)
|
|
defer util_http.CloseResponse(resp)
|
|
if post_err != nil {
|
|
if strings.Contains(post_err.Error(), "connection reset by peer") ||
|
|
strings.Contains(post_err.Error(), "use of closed network connection") {
|
|
glog.V(1).InfofCtx(ctx, "repeat error upload request %s: %v", option.UploadUrl, postErr)
|
|
stats.FilerHandlerCounter.WithLabelValues(stats.RepeatErrorUploadContent).Inc()
|
|
resp, post_err = uploader.httpClient.Do(req)
|
|
defer util_http.CloseResponse(resp)
|
|
}
|
|
}
|
|
if post_err != nil {
|
|
return nil, fmt.Errorf("upload %s %d bytes to %v: %v", option.Filename, originalDataSize, option.UploadUrl, post_err)
|
|
}
|
|
// print("-")
|
|
|
|
var ret UploadResult
|
|
etag := getEtag(resp)
|
|
if resp.StatusCode == http.StatusNoContent {
|
|
ret.ETag = etag
|
|
return &ret, nil
|
|
}
|
|
|
|
resp_body, ra_err := io.ReadAll(resp.Body)
|
|
if ra_err != nil {
|
|
return nil, fmt.Errorf("read response body %v: %w", option.UploadUrl, ra_err)
|
|
}
|
|
|
|
unmarshal_err := json.Unmarshal(resp_body, &ret)
|
|
if unmarshal_err != nil {
|
|
glog.ErrorfCtx(ctx, "unmarshal %s: %v", option.UploadUrl, string(resp_body))
|
|
return nil, fmt.Errorf("unmarshal %v: %w", option.UploadUrl, unmarshal_err)
|
|
}
|
|
if ret.Error != "" {
|
|
return nil, fmt.Errorf("unmarshalled error %v: %v", option.UploadUrl, ret.Error)
|
|
}
|
|
ret.ETag = etag
|
|
ret.ContentMd5 = resp.Header.Get("Content-MD5")
|
|
return &ret, nil
|
|
}
|
|
|
|
func getEtag(r *http.Response) (etag string) {
|
|
etag = r.Header.Get("ETag")
|
|
if strings.HasPrefix(etag, "\"") && strings.HasSuffix(etag, "\"") {
|
|
etag = etag[1 : len(etag)-1]
|
|
}
|
|
return
|
|
}
|