Files
seaweedFS/weed/operation/upload_content.go
Chris Lu 8d6bcddf60 Add S3 volume encryption support with -s3.encryptVolumeData flag (#7890)
* Add S3 volume encryption support with -s3.encryptVolumeData flag

This change adds volume-level encryption support for S3 uploads, similar
to the existing -filer.encryptVolumeData option. Each chunk is encrypted
with its own auto-generated CipherKey when the flag is enabled.

Changes:
- Add -s3.encryptVolumeData flag to weed s3, weed server, and weed mini
- Wire Cipher option through S3ApiServer and ChunkedUploadOption
- Add integration tests for multi-chunk range reads with encryption
- Tests verify encryption works across chunk boundaries

Usage:
  weed s3 -encryptVolumeData
  weed server -s3 -s3.encryptVolumeData
  weed mini -s3.encryptVolumeData

Integration tests:
  go test -v -tags=integration -timeout 5m ./test/s3/sse/...

* Add GitHub Actions CI for S3 volume encryption tests

- Add test-volume-encryption target to Makefile that starts server with -s3.encryptVolumeData
- Add s3-volume-encryption job to GitHub Actions workflow
- Tests run with integration build tag and 10m timeout
- Server logs uploaded on failure for debugging

* Fix S3 client credentials to use environment variables

The test was using hardcoded credentials "any"/"any" but the Makefile
sets AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY to "some_access_key1"/
"some_secret_key1". Updated getS3Client() to read from environment
variables with fallback to "any"/"any" for manual testing.

* Change bucket creation errors from skip to fatal

Tests should fail, not skip, when bucket creation fails. This ensures
that credential mismatches and other configuration issues are caught
rather than silently skipped.

* Make copy and multipart test jobs fail instead of succeed

Changed exit 0 to exit 1 for s3-sse-copy-operations and s3-sse-multipart
jobs. These jobs document known limitations but should fail to ensure
the issues are tracked and addressed, not silently ignored.

* Hardcode S3 credentials to match Makefile

Changed from environment variables to hardcoded credentials
"some_access_key1"/"some_secret_key1" to match the Makefile
configuration. This ensures tests work reliably.

* fix Double Encryption

* fix Chunk Size Mismatch

* Added IsCompressed

* is gzipped

* fix copying

* only perform HEAD request when len(cipherKey) > 0

* Revert "Make copy and multipart test jobs fail instead of succeed"

This reverts commit bc34a7eb3c103ae7ab2000da2a6c3925712eb226.

* fix security vulnerability

* fix security

* Update s3api_object_handlers_copy.go

* Update s3api_object_handlers_copy.go

* jwt to get content length
2025-12-27 00:09:14 -08:00

440 lines
13 KiB
Go

package operation
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"mime"
"mime/multipart"
"net/http"
"net/textproto"
"path/filepath"
"strings"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/util/request_id"
"github.com/valyala/bytebufferpool"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/util"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
util_http_client "github.com/seaweedfs/seaweedfs/weed/util/http/client"
)
type UploadOption struct {
UploadUrl string
Filename string
Cipher bool
IsInputCompressed bool
MimeType string
PairMap map[string]string
Jwt security.EncodedJwt
RetryForever bool
Md5 string
BytesBuffer *bytes.Buffer
}
type UploadResult struct {
Name string `json:"name,omitempty"`
Size uint32 `json:"size,omitempty"`
Error string `json:"error,omitempty"`
ETag string `json:"eTag,omitempty"`
CipherKey []byte `json:"cipherKey,omitempty"`
Mime string `json:"mime,omitempty"`
Gzip uint32 `json:"gzip,omitempty"`
ContentMd5 string `json:"contentMd5,omitempty"`
RetryCount int `json:"-"`
}
func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64, tsNs int64) *filer_pb.FileChunk {
fid, _ := filer_pb.ToFileIdObject(fileId)
return &filer_pb.FileChunk{
FileId: fileId,
Offset: offset,
Size: uint64(uploadResult.Size),
ModifiedTsNs: tsNs,
ETag: uploadResult.ContentMd5,
CipherKey: uploadResult.CipherKey,
IsCompressed: uploadResult.Gzip > 0,
Fid: fid,
}
}
// ToPbFileChunkWithSSE creates a FileChunk with SSE metadata
func (uploadResult *UploadResult) ToPbFileChunkWithSSE(fileId string, offset int64, tsNs int64, sseType filer_pb.SSEType, sseMetadata []byte) *filer_pb.FileChunk {
fid, _ := filer_pb.ToFileIdObject(fileId)
chunk := &filer_pb.FileChunk{
FileId: fileId,
Offset: offset,
Size: uint64(uploadResult.Size),
ModifiedTsNs: tsNs,
ETag: uploadResult.ContentMd5,
CipherKey: uploadResult.CipherKey,
IsCompressed: uploadResult.Gzip > 0,
Fid: fid,
}
// Add SSE metadata if provided
chunk.SseType = sseType
if len(sseMetadata) > 0 {
chunk.SseMetadata = sseMetadata
}
return chunk
}
var (
uploader *Uploader
uploaderErr error
once sync.Once
)
// HTTPClient interface for testing
type HTTPClient interface {
Do(req *http.Request) (*http.Response, error)
}
// Uploader
type Uploader struct {
httpClient HTTPClient
}
func NewUploader() (*Uploader, error) {
once.Do(func() {
// With Dial context
var httpClient *util_http_client.HTTPClient
httpClient, uploaderErr = util_http.NewGlobalHttpClient(util_http_client.AddDialContext)
if uploaderErr != nil {
uploaderErr = fmt.Errorf("error initializing the loader: %s", uploaderErr)
}
if httpClient != nil {
uploader = newUploader(httpClient)
}
})
return uploader, uploaderErr
}
func newUploader(httpClient HTTPClient) *Uploader {
return &Uploader{
httpClient: httpClient,
}
}
// UploadWithRetry will retry both assigning volume request and uploading content
// The option parameter does not need to specify UploadUrl and Jwt, which will come from assigning volume.
func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.AssignVolumeRequest, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (fileId string, uploadResult *UploadResult, err error, data []byte) {
doUploadFunc := func() error {
var host string
var auth security.EncodedJwt
// grpc assign volume
if grpcAssignErr := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, assignErr := client.AssignVolume(context.Background(), assignRequest)
if assignErr != nil {
glog.V(0).Infof("assign volume failure %v: %v", assignRequest, assignErr)
return assignErr
}
if resp.Error != "" {
return fmt.Errorf("assign volume failure %v: %v", assignRequest, resp.Error)
}
fileId, auth = resp.FileId, security.EncodedJwt(resp.Auth)
loc := resp.Location
host = filerClient.AdjustedUrl(loc)
return nil
}); grpcAssignErr != nil {
return fmt.Errorf("filerGrpcAddress assign volume: %w", grpcAssignErr)
}
uploadOption.UploadUrl = genFileUrlFn(host, fileId)
uploadOption.Jwt = auth
var uploadErr error
uploadResult, uploadErr, data = uploader.doUpload(context.Background(), reader, uploadOption)
return uploadErr
}
if uploadOption.RetryForever {
util.RetryUntil("uploadWithRetryForever", doUploadFunc, func(err error) (shouldContinue bool) {
glog.V(0).Infof("upload content: %v", err)
return true
})
} else {
uploadErrList := []string{"transport", "is read only"}
err = util.MultiRetry("uploadWithRetry", uploadErrList, doUploadFunc)
}
return
}
// Upload sends a POST request to a volume server to upload the content with adjustable compression level
func (uploader *Uploader) UploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
uploadResult, err = uploader.retriedUploadData(ctx, data, option)
return
}
// Upload sends a POST request to a volume server to upload the content with fast compression
func (uploader *Uploader) Upload(ctx context.Context, reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
uploadResult, err, data = uploader.doUpload(ctx, reader, option)
return
}
func (uploader *Uploader) doUpload(ctx context.Context, reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
bytesReader, ok := reader.(*util.BytesReader)
if ok {
data = bytesReader.Bytes
} else {
data, err = io.ReadAll(reader)
if err != nil {
err = fmt.Errorf("read input: %w", err)
return
}
}
uploadResult, uploadErr := uploader.retriedUploadData(ctx, data, option)
return uploadResult, uploadErr, data
}
func (uploader *Uploader) retriedUploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
for i := 0; i < 3; i++ {
if i > 0 {
time.Sleep(time.Millisecond * time.Duration(237*(i+1)))
}
uploadResult, err = uploader.doUploadData(ctx, data, option)
if err == nil {
uploadResult.RetryCount = i
return
}
glog.WarningfCtx(ctx, "uploading %d to %s: %v", i, option.UploadUrl, err)
}
return
}
func (uploader *Uploader) doUploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
contentIsGzipped := option.IsInputCompressed
shouldGzipNow := false
if !option.IsInputCompressed {
if option.MimeType == "" {
option.MimeType = http.DetectContentType(data)
// println("detect1 mimetype to", MimeType)
if option.MimeType == "application/octet-stream" {
option.MimeType = ""
}
}
if shouldBeCompressed, iAmSure := util.IsCompressableFileType(filepath.Base(option.Filename), option.MimeType); iAmSure && shouldBeCompressed {
shouldGzipNow = true
} else if !iAmSure && option.MimeType == "" && len(data) > 16*1024 {
var compressed []byte
compressed, err = util.GzipData(data[0:128])
if err != nil {
return
}
shouldGzipNow = len(compressed)*10 < 128*9 // can not compress to less than 90%
}
}
var clearDataLen int
// gzip if possible
// this could be double copying
clearDataLen = len(data)
clearData := data
if shouldGzipNow {
compressed, compressErr := util.GzipData(data)
// fmt.Printf("data is compressed from %d ==> %d\n", len(data), len(compressed))
if compressErr == nil {
if len(compressed) < len(data) {
data = compressed
contentIsGzipped = true
}
}
} else if option.IsInputCompressed {
// just to get the clear data length
clearData, err = util.DecompressData(data)
if err == nil {
clearDataLen = len(clearData)
}
}
if option.Cipher {
// encrypt(gzip(data))
// encrypt
cipherKey := util.GenCipherKey()
encryptedData, encryptionErr := util.Encrypt(data, cipherKey)
if encryptionErr != nil {
err = fmt.Errorf("encrypt input: %w", encryptionErr)
return
}
// upload data
uploadResult, err = uploader.upload_content(ctx, func(w io.Writer) (err error) {
_, err = w.Write(encryptedData)
return
}, len(encryptedData), &UploadOption{
UploadUrl: option.UploadUrl,
Filename: "",
Cipher: false,
IsInputCompressed: false,
MimeType: "",
PairMap: nil,
Jwt: option.Jwt,
})
if uploadResult == nil {
return
}
uploadResult.Name = option.Filename
uploadResult.Mime = option.MimeType
uploadResult.CipherKey = cipherKey
uploadResult.Size = uint32(len(data))
if contentIsGzipped {
uploadResult.Gzip = 1
}
} else {
// upload data
uploadResult, err = uploader.upload_content(ctx, func(w io.Writer) (err error) {
_, err = w.Write(data)
return
}, len(data), &UploadOption{
UploadUrl: option.UploadUrl,
Filename: option.Filename,
Cipher: false,
IsInputCompressed: contentIsGzipped,
MimeType: option.MimeType,
PairMap: option.PairMap,
Jwt: option.Jwt,
Md5: option.Md5,
BytesBuffer: option.BytesBuffer,
})
if uploadResult == nil {
return
}
uploadResult.Size = uint32(clearDataLen)
if contentIsGzipped {
uploadResult.Gzip = 1
}
}
return uploadResult, err
}
func (uploader *Uploader) upload_content(ctx context.Context, fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) {
var body_writer *multipart.Writer
var reqReader *bytes.Reader
var buf *bytebufferpool.ByteBuffer
if option.BytesBuffer == nil {
buf = GetBuffer()
defer PutBuffer(buf)
body_writer = multipart.NewWriter(buf)
} else {
option.BytesBuffer.Reset()
body_writer = multipart.NewWriter(option.BytesBuffer)
}
h := make(textproto.MIMEHeader)
// Use mime.FormatMediaType for RFC 6266 compliant Content-Disposition,
// properly handling non-ASCII characters and special characters
h.Set("Content-Disposition", mime.FormatMediaType("form-data", map[string]string{"name": "file", "filename": option.Filename}))
h.Set("Idempotency-Key", option.UploadUrl)
if option.MimeType == "" {
option.MimeType = mime.TypeByExtension(strings.ToLower(filepath.Ext(option.Filename)))
}
if option.MimeType != "" {
h.Set("Content-Type", option.MimeType)
}
if option.IsInputCompressed {
h.Set("Content-Encoding", "gzip")
}
if option.Md5 != "" {
h.Set("Content-MD5", option.Md5)
}
file_writer, cp_err := body_writer.CreatePart(h)
if cp_err != nil {
glog.V(0).InfolnCtx(ctx, "error creating form file", cp_err.Error())
return nil, cp_err
}
if err := fillBufferFunction(file_writer); err != nil {
glog.V(0).InfolnCtx(ctx, "error copying data", err)
return nil, err
}
content_type := body_writer.FormDataContentType()
if err := body_writer.Close(); err != nil {
glog.V(0).InfolnCtx(ctx, "error closing body", err)
return nil, err
}
if option.BytesBuffer == nil {
reqReader = bytes.NewReader(buf.Bytes())
} else {
reqReader = bytes.NewReader(option.BytesBuffer.Bytes())
}
req, postErr := http.NewRequestWithContext(ctx, http.MethodPost, option.UploadUrl, reqReader)
if postErr != nil {
glog.V(1).InfofCtx(ctx, "create upload request %s: %v", option.UploadUrl, postErr)
return nil, fmt.Errorf("create upload request %s: %v", option.UploadUrl, postErr)
}
req.Header.Set("Content-Type", content_type)
for k, v := range option.PairMap {
req.Header.Set(k, v)
}
if option.Jwt != "" {
req.Header.Set("Authorization", "BEARER "+string(option.Jwt))
}
request_id.InjectToRequest(ctx, req)
// print("+")
resp, post_err := uploader.httpClient.Do(req)
defer util_http.CloseResponse(resp)
if post_err != nil {
if strings.Contains(post_err.Error(), "connection reset by peer") ||
strings.Contains(post_err.Error(), "use of closed network connection") {
glog.V(1).InfofCtx(ctx, "repeat error upload request %s: %v", option.UploadUrl, postErr)
stats.FilerHandlerCounter.WithLabelValues(stats.RepeatErrorUploadContent).Inc()
resp, post_err = uploader.httpClient.Do(req)
defer util_http.CloseResponse(resp)
}
}
if post_err != nil {
return nil, fmt.Errorf("upload %s %d bytes to %v: %v", option.Filename, originalDataSize, option.UploadUrl, post_err)
}
// print("-")
var ret UploadResult
etag := getEtag(resp)
if resp.StatusCode == http.StatusNoContent {
ret.ETag = etag
return &ret, nil
}
resp_body, ra_err := io.ReadAll(resp.Body)
if ra_err != nil {
return nil, fmt.Errorf("read response body %v: %w", option.UploadUrl, ra_err)
}
unmarshal_err := json.Unmarshal(resp_body, &ret)
if unmarshal_err != nil {
glog.ErrorfCtx(ctx, "unmarshal %s: %v", option.UploadUrl, string(resp_body))
return nil, fmt.Errorf("unmarshal %v: %w", option.UploadUrl, unmarshal_err)
}
if ret.Error != "" {
return nil, fmt.Errorf("unmarshalled error %v: %v", option.UploadUrl, ret.Error)
}
ret.ETag = etag
ret.ContentMd5 = resp.Header.Get("Content-MD5")
return &ret, nil
}
func getEtag(r *http.Response) (etag string) {
etag = r.Header.Get("ETag")
if strings.HasPrefix(etag, "\"") && strings.HasSuffix(etag, "\"") {
etag = etag[1 : len(etag)-1]
}
return
}