* fix: correct chunk size in encrypted uploads When uploading with encryption enabled (-encryptVolumeData flag), the chunk metadata was incorrectly storing the encrypted data size instead of the original uncompressed size. This caused data corruption when reading files back because: 1. Encrypted data is larger than original data 2. After decryption and decompression, the actual data is smaller 3. Size validation in readEncryptedUrl would fail 4. Files appeared truncated with null bytes The fix ensures uploadResult.Size uses clearDataLen (the original uncompressed size) in both cipher and non-cipher code paths, matching the expected behavior. Fixes: #8151 * test: add comprehensive tests for encrypted upload size fix Add unit tests to verify that encrypted uploads correctly store the original uncompressed data size, not the encrypted size. Tests cover: - Cipher key generation and encryption/decryption roundtrip - Compression + encryption roundtrip with size preservation - Different data sizes - Compression detection and effectiveness - Size behavior demonstration showing the bug and fix All tests verify that metadata stores the original size (19 bytes in the key example) not the encrypted size (75 bytes), which is critical for preventing data corruption on read operations. Issue: #8151 * remove emojis * Update upload_content_test.go * Delete upload_content_test.go
440 lines
13 KiB
Go
440 lines
13 KiB
Go
package operation
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"mime"
|
|
"mime/multipart"
|
|
"net/http"
|
|
"net/textproto"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/util/request_id"
|
|
"github.com/valyala/bytebufferpool"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/security"
|
|
"github.com/seaweedfs/seaweedfs/weed/stats"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
|
|
util_http_client "github.com/seaweedfs/seaweedfs/weed/util/http/client"
|
|
)
|
|
|
|
type UploadOption struct {
|
|
UploadUrl string
|
|
Filename string
|
|
Cipher bool
|
|
IsInputCompressed bool
|
|
MimeType string
|
|
PairMap map[string]string
|
|
Jwt security.EncodedJwt
|
|
RetryForever bool
|
|
Md5 string
|
|
BytesBuffer *bytes.Buffer
|
|
}
|
|
|
|
type UploadResult struct {
|
|
Name string `json:"name,omitempty"`
|
|
Size uint32 `json:"size,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
ETag string `json:"eTag,omitempty"`
|
|
CipherKey []byte `json:"cipherKey,omitempty"`
|
|
Mime string `json:"mime,omitempty"`
|
|
Gzip uint32 `json:"gzip,omitempty"`
|
|
ContentMd5 string `json:"contentMd5,omitempty"`
|
|
RetryCount int `json:"-"`
|
|
}
|
|
|
|
func (uploadResult *UploadResult) ToPbFileChunk(fileId string, offset int64, tsNs int64) *filer_pb.FileChunk {
|
|
fid, _ := filer_pb.ToFileIdObject(fileId)
|
|
return &filer_pb.FileChunk{
|
|
FileId: fileId,
|
|
Offset: offset,
|
|
Size: uint64(uploadResult.Size),
|
|
ModifiedTsNs: tsNs,
|
|
ETag: uploadResult.ContentMd5,
|
|
CipherKey: uploadResult.CipherKey,
|
|
IsCompressed: uploadResult.Gzip > 0,
|
|
Fid: fid,
|
|
}
|
|
}
|
|
|
|
// ToPbFileChunkWithSSE creates a FileChunk with SSE metadata
|
|
func (uploadResult *UploadResult) ToPbFileChunkWithSSE(fileId string, offset int64, tsNs int64, sseType filer_pb.SSEType, sseMetadata []byte) *filer_pb.FileChunk {
|
|
fid, _ := filer_pb.ToFileIdObject(fileId)
|
|
chunk := &filer_pb.FileChunk{
|
|
FileId: fileId,
|
|
Offset: offset,
|
|
Size: uint64(uploadResult.Size),
|
|
ModifiedTsNs: tsNs,
|
|
ETag: uploadResult.ContentMd5,
|
|
CipherKey: uploadResult.CipherKey,
|
|
IsCompressed: uploadResult.Gzip > 0,
|
|
Fid: fid,
|
|
}
|
|
|
|
// Add SSE metadata if provided
|
|
chunk.SseType = sseType
|
|
if len(sseMetadata) > 0 {
|
|
chunk.SseMetadata = sseMetadata
|
|
}
|
|
|
|
return chunk
|
|
}
|
|
|
|
var (
|
|
uploader *Uploader
|
|
uploaderErr error
|
|
once sync.Once
|
|
)
|
|
|
|
// HTTPClient interface for testing
|
|
type HTTPClient interface {
|
|
Do(req *http.Request) (*http.Response, error)
|
|
}
|
|
|
|
// Uploader
|
|
type Uploader struct {
|
|
httpClient HTTPClient
|
|
}
|
|
|
|
func NewUploader() (*Uploader, error) {
|
|
once.Do(func() {
|
|
// With Dial context
|
|
var httpClient *util_http_client.HTTPClient
|
|
httpClient, uploaderErr = util_http.NewGlobalHttpClient(util_http_client.AddDialContext)
|
|
if uploaderErr != nil {
|
|
uploaderErr = fmt.Errorf("error initializing the loader: %s", uploaderErr)
|
|
}
|
|
if httpClient != nil {
|
|
uploader = newUploader(httpClient)
|
|
}
|
|
})
|
|
return uploader, uploaderErr
|
|
}
|
|
|
|
func newUploader(httpClient HTTPClient) *Uploader {
|
|
return &Uploader{
|
|
httpClient: httpClient,
|
|
}
|
|
}
|
|
|
|
// UploadWithRetry will retry both assigning volume request and uploading content
|
|
// The option parameter does not need to specify UploadUrl and Jwt, which will come from assigning volume.
|
|
func (uploader *Uploader) UploadWithRetry(filerClient filer_pb.FilerClient, assignRequest *filer_pb.AssignVolumeRequest, uploadOption *UploadOption, genFileUrlFn func(host, fileId string) string, reader io.Reader) (fileId string, uploadResult *UploadResult, err error, data []byte) {
|
|
doUploadFunc := func() error {
|
|
|
|
var host string
|
|
var auth security.EncodedJwt
|
|
|
|
// grpc assign volume
|
|
if grpcAssignErr := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
|
|
resp, assignErr := client.AssignVolume(context.Background(), assignRequest)
|
|
if assignErr != nil {
|
|
glog.V(0).Infof("assign volume failure %v: %v", assignRequest, assignErr)
|
|
return assignErr
|
|
}
|
|
if resp.Error != "" {
|
|
return fmt.Errorf("assign volume failure %v: %v", assignRequest, resp.Error)
|
|
}
|
|
|
|
fileId, auth = resp.FileId, security.EncodedJwt(resp.Auth)
|
|
loc := resp.Location
|
|
host = filerClient.AdjustedUrl(loc)
|
|
|
|
return nil
|
|
}); grpcAssignErr != nil {
|
|
return fmt.Errorf("filerGrpcAddress assign volume: %w", grpcAssignErr)
|
|
}
|
|
|
|
uploadOption.UploadUrl = genFileUrlFn(host, fileId)
|
|
uploadOption.Jwt = auth
|
|
|
|
var uploadErr error
|
|
uploadResult, uploadErr, data = uploader.doUpload(context.Background(), reader, uploadOption)
|
|
return uploadErr
|
|
}
|
|
if uploadOption.RetryForever {
|
|
util.RetryUntil("uploadWithRetryForever", doUploadFunc, func(err error) (shouldContinue bool) {
|
|
glog.V(0).Infof("upload content: %v", err)
|
|
return true
|
|
})
|
|
} else {
|
|
uploadErrList := []string{"transport", "is read only"}
|
|
err = util.MultiRetry("uploadWithRetry", uploadErrList, doUploadFunc)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// Upload sends a POST request to a volume server to upload the content with adjustable compression level
|
|
func (uploader *Uploader) UploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
|
|
uploadResult, err = uploader.retriedUploadData(ctx, data, option)
|
|
return
|
|
}
|
|
|
|
// Upload sends a POST request to a volume server to upload the content with fast compression
|
|
func (uploader *Uploader) Upload(ctx context.Context, reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
|
|
uploadResult, err, data = uploader.doUpload(ctx, reader, option)
|
|
return
|
|
}
|
|
|
|
func (uploader *Uploader) doUpload(ctx context.Context, reader io.Reader, option *UploadOption) (uploadResult *UploadResult, err error, data []byte) {
|
|
bytesReader, ok := reader.(*util.BytesReader)
|
|
if ok {
|
|
data = bytesReader.Bytes
|
|
} else {
|
|
data, err = io.ReadAll(reader)
|
|
if err != nil {
|
|
err = fmt.Errorf("read input: %w", err)
|
|
return
|
|
}
|
|
}
|
|
uploadResult, uploadErr := uploader.retriedUploadData(ctx, data, option)
|
|
return uploadResult, uploadErr, data
|
|
}
|
|
|
|
func (uploader *Uploader) retriedUploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
|
|
for i := 0; i < 3; i++ {
|
|
if i > 0 {
|
|
time.Sleep(time.Millisecond * time.Duration(237*(i+1)))
|
|
}
|
|
uploadResult, err = uploader.doUploadData(ctx, data, option)
|
|
if err == nil {
|
|
uploadResult.RetryCount = i
|
|
return
|
|
}
|
|
glog.WarningfCtx(ctx, "uploading %d to %s: %v", i, option.UploadUrl, err)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (uploader *Uploader) doUploadData(ctx context.Context, data []byte, option *UploadOption) (uploadResult *UploadResult, err error) {
|
|
contentIsGzipped := option.IsInputCompressed
|
|
shouldGzipNow := false
|
|
if !option.IsInputCompressed {
|
|
if option.MimeType == "" {
|
|
option.MimeType = http.DetectContentType(data)
|
|
// println("detect1 mimetype to", MimeType)
|
|
if option.MimeType == "application/octet-stream" {
|
|
option.MimeType = ""
|
|
}
|
|
}
|
|
if shouldBeCompressed, iAmSure := util.IsCompressableFileType(filepath.Base(option.Filename), option.MimeType); iAmSure && shouldBeCompressed {
|
|
shouldGzipNow = true
|
|
} else if !iAmSure && option.MimeType == "" && len(data) > 16*1024 {
|
|
var compressed []byte
|
|
compressed, err = util.GzipData(data[0:128])
|
|
if err != nil {
|
|
return
|
|
}
|
|
shouldGzipNow = len(compressed)*10 < 128*9 // can not compress to less than 90%
|
|
}
|
|
}
|
|
|
|
var clearDataLen int
|
|
|
|
// gzip if possible
|
|
// this could be double copying
|
|
clearDataLen = len(data)
|
|
clearData := data
|
|
if shouldGzipNow {
|
|
compressed, compressErr := util.GzipData(data)
|
|
// fmt.Printf("data is compressed from %d ==> %d\n", len(data), len(compressed))
|
|
if compressErr == nil {
|
|
if len(compressed) < len(data) {
|
|
data = compressed
|
|
contentIsGzipped = true
|
|
}
|
|
}
|
|
} else if option.IsInputCompressed {
|
|
// just to get the clear data length
|
|
clearData, err = util.DecompressData(data)
|
|
if err == nil {
|
|
clearDataLen = len(clearData)
|
|
}
|
|
}
|
|
|
|
if option.Cipher {
|
|
// encrypt(gzip(data))
|
|
|
|
// encrypt
|
|
cipherKey := util.GenCipherKey()
|
|
encryptedData, encryptionErr := util.Encrypt(data, cipherKey)
|
|
if encryptionErr != nil {
|
|
err = fmt.Errorf("encrypt input: %w", encryptionErr)
|
|
return
|
|
}
|
|
|
|
// upload data
|
|
uploadResult, err = uploader.upload_content(ctx, func(w io.Writer) (err error) {
|
|
_, err = w.Write(encryptedData)
|
|
return
|
|
}, len(encryptedData), &UploadOption{
|
|
UploadUrl: option.UploadUrl,
|
|
Filename: "",
|
|
Cipher: false,
|
|
IsInputCompressed: false,
|
|
MimeType: "",
|
|
PairMap: nil,
|
|
Jwt: option.Jwt,
|
|
})
|
|
if uploadResult == nil {
|
|
return
|
|
}
|
|
uploadResult.Name = option.Filename
|
|
uploadResult.Mime = option.MimeType
|
|
uploadResult.CipherKey = cipherKey
|
|
uploadResult.Size = uint32(clearDataLen)
|
|
if contentIsGzipped {
|
|
uploadResult.Gzip = 1
|
|
}
|
|
} else {
|
|
// upload data
|
|
uploadResult, err = uploader.upload_content(ctx, func(w io.Writer) (err error) {
|
|
_, err = w.Write(data)
|
|
return
|
|
}, len(data), &UploadOption{
|
|
UploadUrl: option.UploadUrl,
|
|
Filename: option.Filename,
|
|
Cipher: false,
|
|
IsInputCompressed: contentIsGzipped,
|
|
MimeType: option.MimeType,
|
|
PairMap: option.PairMap,
|
|
Jwt: option.Jwt,
|
|
Md5: option.Md5,
|
|
BytesBuffer: option.BytesBuffer,
|
|
})
|
|
if uploadResult == nil {
|
|
return
|
|
}
|
|
uploadResult.Size = uint32(clearDataLen)
|
|
if contentIsGzipped {
|
|
uploadResult.Gzip = 1
|
|
}
|
|
}
|
|
|
|
return uploadResult, err
|
|
}
|
|
|
|
func (uploader *Uploader) upload_content(ctx context.Context, fillBufferFunction func(w io.Writer) error, originalDataSize int, option *UploadOption) (*UploadResult, error) {
|
|
var body_writer *multipart.Writer
|
|
var reqReader *bytes.Reader
|
|
var buf *bytebufferpool.ByteBuffer
|
|
if option.BytesBuffer == nil {
|
|
buf = GetBuffer()
|
|
defer PutBuffer(buf)
|
|
body_writer = multipart.NewWriter(buf)
|
|
} else {
|
|
option.BytesBuffer.Reset()
|
|
body_writer = multipart.NewWriter(option.BytesBuffer)
|
|
}
|
|
h := make(textproto.MIMEHeader)
|
|
// Use mime.FormatMediaType for RFC 6266 compliant Content-Disposition,
|
|
// properly handling non-ASCII characters and special characters
|
|
h.Set("Content-Disposition", mime.FormatMediaType("form-data", map[string]string{"name": "file", "filename": option.Filename}))
|
|
h.Set("Idempotency-Key", option.UploadUrl)
|
|
if option.MimeType == "" {
|
|
option.MimeType = mime.TypeByExtension(strings.ToLower(filepath.Ext(option.Filename)))
|
|
}
|
|
if option.MimeType != "" {
|
|
h.Set("Content-Type", option.MimeType)
|
|
}
|
|
if option.IsInputCompressed {
|
|
h.Set("Content-Encoding", "gzip")
|
|
}
|
|
if option.Md5 != "" {
|
|
h.Set("Content-MD5", option.Md5)
|
|
}
|
|
|
|
file_writer, cp_err := body_writer.CreatePart(h)
|
|
if cp_err != nil {
|
|
glog.V(0).InfolnCtx(ctx, "error creating form file", cp_err.Error())
|
|
return nil, cp_err
|
|
}
|
|
if err := fillBufferFunction(file_writer); err != nil {
|
|
glog.V(0).InfolnCtx(ctx, "error copying data", err)
|
|
return nil, err
|
|
}
|
|
content_type := body_writer.FormDataContentType()
|
|
if err := body_writer.Close(); err != nil {
|
|
glog.V(0).InfolnCtx(ctx, "error closing body", err)
|
|
return nil, err
|
|
}
|
|
if option.BytesBuffer == nil {
|
|
reqReader = bytes.NewReader(buf.Bytes())
|
|
} else {
|
|
reqReader = bytes.NewReader(option.BytesBuffer.Bytes())
|
|
}
|
|
req, postErr := http.NewRequestWithContext(ctx, http.MethodPost, option.UploadUrl, reqReader)
|
|
if postErr != nil {
|
|
glog.V(1).InfofCtx(ctx, "create upload request %s: %v", option.UploadUrl, postErr)
|
|
return nil, fmt.Errorf("create upload request %s: %v", option.UploadUrl, postErr)
|
|
}
|
|
req.Header.Set("Content-Type", content_type)
|
|
for k, v := range option.PairMap {
|
|
req.Header.Set(k, v)
|
|
}
|
|
if option.Jwt != "" {
|
|
req.Header.Set("Authorization", "BEARER "+string(option.Jwt))
|
|
}
|
|
|
|
request_id.InjectToRequest(ctx, req)
|
|
|
|
// print("+")
|
|
resp, post_err := uploader.httpClient.Do(req)
|
|
defer util_http.CloseResponse(resp)
|
|
if post_err != nil {
|
|
if strings.Contains(post_err.Error(), "connection reset by peer") ||
|
|
strings.Contains(post_err.Error(), "use of closed network connection") {
|
|
glog.V(1).InfofCtx(ctx, "repeat error upload request %s: %v", option.UploadUrl, postErr)
|
|
stats.FilerHandlerCounter.WithLabelValues(stats.RepeatErrorUploadContent).Inc()
|
|
resp, post_err = uploader.httpClient.Do(req)
|
|
defer util_http.CloseResponse(resp)
|
|
}
|
|
}
|
|
if post_err != nil {
|
|
return nil, fmt.Errorf("upload %s %d bytes to %v: %v", option.Filename, originalDataSize, option.UploadUrl, post_err)
|
|
}
|
|
// print("-")
|
|
|
|
var ret UploadResult
|
|
etag := getEtag(resp)
|
|
if resp.StatusCode == http.StatusNoContent {
|
|
ret.ETag = etag
|
|
return &ret, nil
|
|
}
|
|
|
|
resp_body, ra_err := io.ReadAll(resp.Body)
|
|
if ra_err != nil {
|
|
return nil, fmt.Errorf("read response body %v: %w", option.UploadUrl, ra_err)
|
|
}
|
|
|
|
unmarshal_err := json.Unmarshal(resp_body, &ret)
|
|
if unmarshal_err != nil {
|
|
glog.ErrorfCtx(ctx, "unmarshal %s: %v", option.UploadUrl, string(resp_body))
|
|
return nil, fmt.Errorf("unmarshal %v: %w", option.UploadUrl, unmarshal_err)
|
|
}
|
|
if ret.Error != "" {
|
|
return nil, fmt.Errorf("unmarshalled error %v: %v", option.UploadUrl, ret.Error)
|
|
}
|
|
ret.ETag = etag
|
|
ret.ContentMd5 = resp.Header.Get("Content-MD5")
|
|
return &ret, nil
|
|
}
|
|
|
|
func getEtag(r *http.Response) (etag string) {
|
|
etag = r.Header.Get("ETag")
|
|
if strings.HasPrefix(etag, "\"") && strings.HasSuffix(etag, "\"") {
|
|
etag = etag[1 : len(etag)-1]
|
|
}
|
|
return
|
|
}
|