Files
seaweedFS/weed/s3api/filer_multipart.go
Chris Lu c284e51d20 fix: multipart upload ETag calculation (#8238)
* fix multipart etag

* address comments

* clean up

* clean up

* optimization

* address comments

* unquoted etag

* dedup

* upgrade

* clean

* etag

* return quoted tag

* quoted etag

* debug

* s3api: unify ETag retrieval and quoting across handlers

Refactor newListEntry to take *S3ApiServer and use getObjectETag,
and update setResponseHeaders to use the same logic. This ensures
consistent ETags are returned for both listing and direct access.

* s3api: implement ListObjects deduplication for versioned buckets

Handle duplicate entries between the main path and the .versions
directory by prioritizing the latest version when bucket versioning
is enabled.

* s3api: cleanup stale main file entries during versioned uploads

Add explicit deletion of pre-existing "main" files when creating new
versions in versioned buckets. This prevents stale entries from
appearing in bucket listings and ensures consistency.

* s3api: fix cleanup code placement in versioned uploads

Correct the placement of rm calls in completeMultipartUpload and
putVersionedObject to ensure stale main files are properly deleted
during versioned uploads.

* s3api: improve getObjectETag fallback for empty ExtETagKey

Ensure that when ExtETagKey exists but contains an empty value,
the function falls through to MD5/chunk-based calculation instead
of returning an empty string.

* s3api: fix test files for new newListEntry signature

Update test files to use the new newListEntry signature where the
first parameter is *S3ApiServer. Created mockS3ApiServer to properly
test owner display name lookup functionality.

* s3api: use filer.ETag for consistent Md5 handling in getEtagFromEntry

Change getEtagFromEntry fallback to use filer.ETag(entry) instead of
filer.ETagChunks to ensure legacy entries with Attributes.Md5 are
handled consistently with the rest of the codebase.

* s3api: optimize list logic and fix conditional header logging

- Hoist bucket versioning check out of per-entry callback to avoid
  repeated getVersioningState calls
- Extract appendOrDedup helper function to eliminate duplicate
  dedup/append logic across multiple code paths
- Change If-Match mismatch logging from glog.Errorf to glog.V(3).Infof
  and remove DEBUG prefix for consistency

* s3api: fix test mock to properly initialize IAM accounts

Fixed nil pointer dereference in TestNewListEntryOwnerDisplayName by
directly initializing the IdentityAccessManagement.accounts map in the
test setup. This ensures newListEntry can properly look up account
display names without panicking.

* cleanup

* s3api: remove premature main file cleanup in versioned uploads

Removed incorrect cleanup logic that was deleting main files during
versioned uploads. This was causing test failures because it deleted
objects that should have been preserved as null versions when
versioning was first enabled. The deduplication logic in listing is
sufficient to handle duplicate entries without deleting files during
upload.

* s3api: add empty-value guard to getEtagFromEntry

Added the same empty-value guard used in getObjectETag to prevent
returning quoted empty strings. When ExtETagKey exists but is empty,
the function now falls through to filer.ETag calculation instead of
returning "".

* s3api: fix listing of directory key objects with matching prefix

Revert prefix handling logic to use strings.TrimPrefix instead of
checking HasPrefix with empty string result. This ensures that when a
directory key object exactly matches the prefix (e.g. prefix="dir/",
object="dir/"), it is correctly handled as a regular entry instead of
being skipped or incorrectly processed as a common prefix. Also fixed
missing variable definition.

* s3api: refactor list inline dedup to use appendOrDedup helper

Refactored the inline deduplication logic in listFilerEntries to use the
shared appendOrDedup helper function. This ensures consistent behavior
and reduces code duplication.

* test: fix port allocation race in s3tables integration test

Updated startMiniCluster to find all required ports simultaneously using
findAvailablePorts instead of sequentially. This prevents race conditions
where the OS reallocates a port that was just released, causing multiple
services (e.g. Filer and Volume) to be assigned the same port and fail
to start.
2026-02-06 21:54:43 -08:00

1004 lines
38 KiB
Go

package s3api
import (
"cmp"
"crypto/md5"
"crypto/rand"
"encoding/base64"
"encoding/hex"
"encoding/json"
"encoding/xml"
"errors"
"fmt"
"math"
"net/url"
"path"
"slices"
"sort"
"strconv"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/google/uuid"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"net/http"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
)
const (
multipartExt = ".part"
multiPartMinSize = 5 * 1024 * 1024
)
type InitiateMultipartUploadResult struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ InitiateMultipartUploadResult"`
s3.CreateMultipartUploadOutput
}
// getRequestScheme determines the URL scheme (http or https) from the request
// Checks X-Forwarded-Proto header first (for proxies), then TLS state
func getRequestScheme(r *http.Request) string {
// Check X-Forwarded-Proto header for proxied requests
if proto := r.Header.Get("X-Forwarded-Proto"); proto != "" {
return proto
}
// Check if connection is TLS
if r.TLS != nil {
return "https"
}
return "http"
}
func (s3a *S3ApiServer) createMultipartUpload(r *http.Request, input *s3.CreateMultipartUploadInput) (output *InitiateMultipartUploadResult, code s3err.ErrorCode) {
glog.V(2).Infof("createMultipartUpload input %v", input)
uploadIdString := s3a.generateUploadID(*input.Key)
uploadIdString = uploadIdString + "_" + strings.ReplaceAll(uuid.New().String(), "-", "")
// Prepare error handling outside callback scope
var encryptionError error
if err := s3a.mkdir(s3a.genUploadsFolder(*input.Bucket), uploadIdString, func(entry *filer_pb.Entry) {
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
entry.Extended[s3_constants.ExtMultipartObjectKey] = []byte(*input.Key)
// Set object owner for multipart upload
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
if amzAccountId != "" {
entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
}
for k, v := range input.Metadata {
entry.Extended[k] = []byte(*v)
}
if input.ContentType != nil {
entry.Attributes.Mime = *input.ContentType
}
// Prepare and apply encryption configuration within directory creation
// This ensures encryption resources are only allocated if directory creation succeeds
encryptionConfig, prepErr := s3a.prepareMultipartEncryptionConfig(r, *input.Bucket, uploadIdString)
if prepErr != nil {
encryptionError = prepErr
return // Exit callback, letting mkdir handle the error
}
s3a.applyMultipartEncryptionConfig(entry, encryptionConfig)
// Extract and store object lock metadata from request headers
// This ensures object lock settings from create_multipart_upload are preserved
if err := s3a.extractObjectLockMetadataFromRequest(r, entry); err != nil {
glog.Errorf("createMultipartUpload: failed to extract object lock metadata: %v", err)
// Don't fail the upload - this matches AWS behavior for invalid metadata
}
}); err != nil {
_, errorCode := handleMultipartInternalError("create multipart upload directory", err)
return nil, errorCode
}
// Check for encryption configuration errors that occurred within the callback
if encryptionError != nil {
_, errorCode := handleMultipartInternalError("prepare encryption configuration", encryptionError)
return nil, errorCode
}
output = &InitiateMultipartUploadResult{
CreateMultipartUploadOutput: s3.CreateMultipartUploadOutput{
Bucket: input.Bucket,
Key: objectKey(input.Key),
UploadId: aws.String(uploadIdString),
},
}
return
}
type CompleteMultipartUploadResult struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CompleteMultipartUploadResult"`
Location *string `xml:"Location,omitempty"`
Bucket *string `xml:"Bucket,omitempty"`
Key *string `xml:"Key,omitempty"`
ETag *string `xml:"ETag,omitempty"`
// VersionId is NOT included in XML body - it should only be in x-amz-version-id HTTP header
// Store the VersionId internally for setting HTTP header, but don't marshal to XML
VersionId *string `xml:"-"`
}
// copySSEHeadersFromFirstPart copies all SSE-related headers from the first part to the destination entry
// This is critical for detectPrimarySSEType to work correctly and ensures encryption metadata is preserved
func copySSEHeadersFromFirstPart(dst *filer_pb.Entry, firstPart *filer_pb.Entry, context string) {
if firstPart == nil || firstPart.Extended == nil {
return
}
// Copy ALL SSE-related headers (not just SeaweedFSSSEKMSKey)
sseKeys := []string{
// SSE-C headers
s3_constants.SeaweedFSSSEIV,
s3_constants.AmzServerSideEncryptionCustomerAlgorithm,
s3_constants.AmzServerSideEncryptionCustomerKeyMD5,
// SSE-KMS headers
s3_constants.SeaweedFSSSEKMSKey,
s3_constants.AmzServerSideEncryptionAwsKmsKeyId,
// SSE-S3 headers
s3_constants.SeaweedFSSSES3Key,
// Common SSE header (for SSE-KMS and SSE-S3)
s3_constants.AmzServerSideEncryption,
}
for _, key := range sseKeys {
if value, exists := firstPart.Extended[key]; exists {
dst.Extended[key] = value
glog.V(4).Infof("completeMultipartUpload: copied SSE header %s from first part (%s)", key, context)
}
}
}
func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.CompleteMultipartUploadInput, parts *CompleteMultipartUpload) (output *CompleteMultipartUploadResult, code s3err.ErrorCode) {
glog.V(2).Infof("completeMultipartUpload input %v", input)
if len(parts.Parts) == 0 {
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc()
return nil, s3err.ErrNoSuchUpload
}
completedPartNumbers := []int{}
completedPartMap := make(map[int][]string)
maxPartNo := 1
for _, part := range parts.Parts {
if _, ok := completedPartMap[part.PartNumber]; !ok {
completedPartNumbers = append(completedPartNumbers, part.PartNumber)
}
completedPartMap[part.PartNumber] = append(completedPartMap[part.PartNumber], part.ETag)
maxPartNo = maxInt(maxPartNo, part.PartNumber)
}
sort.Ints(completedPartNumbers)
uploadDirectory := s3a.genUploadsFolder(*input.Bucket) + "/" + *input.UploadId
// Use explicit limit to ensure all parts are listed (up to S3's max of 10,000 parts)
// Previously limit=0 relied on server's DirListingLimit default (1000 in weed server mode),
// which caused CompleteMultipartUpload to fail for uploads with more than 1000 parts.
entries, _, err := s3a.list(uploadDirectory, "", "", false, s3_constants.MaxS3MultipartParts+1)
if err != nil {
glog.Errorf("completeMultipartUpload %s %s error: %v, entries:%d", *input.Bucket, *input.UploadId, err, len(entries))
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc()
return nil, s3err.ErrNoSuchUpload
}
if len(entries) == 0 {
entryName, dirName := s3a.getEntryNameAndDir(input)
if entry, _ := s3a.getEntry(dirName, entryName); entry != nil && entry.Extended != nil {
if uploadId, ok := entry.Extended[s3_constants.SeaweedFSUploadId]; ok && *input.UploadId == string(uploadId) {
// Location uses the S3 endpoint that the client connected to
// Format: scheme://s3-endpoint/bucket/object (following AWS S3 API)
return &CompleteMultipartUploadResult{
Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))),
Bucket: input.Bucket,
ETag: aws.String(getEtagFromEntry(entry)),
Key: objectKey(input.Key),
}, s3err.ErrNone
}
}
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc()
return nil, s3err.ErrNoSuchUpload
}
pentry, err := s3a.getEntry(s3a.genUploadsFolder(*input.Bucket), *input.UploadId)
if err != nil {
glog.Errorf("completeMultipartUpload %s %s error: %v", *input.Bucket, *input.UploadId, err)
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedNoSuchUpload).Inc()
return nil, s3err.ErrNoSuchUpload
}
deleteEntries := []*filer_pb.Entry{}
partEntries := make(map[int][]*filer_pb.Entry, len(entries))
entityTooSmall := false
entityWithTtl := false
for _, entry := range entries {
foundEntry := false
glog.V(4).Infof("completeMultipartUpload part entries %s", entry.Name)
if entry.IsDirectory || !strings.HasSuffix(entry.Name, multipartExt) {
continue
}
partNumber, err := parsePartNumber(entry.Name)
if err != nil {
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartNumber).Inc()
glog.Errorf("completeMultipartUpload failed to pasre partNumber %s:%s", entry.Name, err)
continue
}
completedPartsByNumber, ok := completedPartMap[partNumber]
if !ok {
continue
}
for _, partETag := range completedPartsByNumber {
partETag = strings.Trim(partETag, `"`)
entryETag := hex.EncodeToString(entry.Attributes.GetMd5())
if partETag != "" && len(partETag) == 32 && entryETag != "" {
if entryETag != partETag {
glog.Errorf("completeMultipartUpload %s ETag mismatch chunk: %s part: %s", entry.Name, entryETag, partETag)
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedEtagMismatch).Inc()
continue
}
} else {
glog.Warningf("invalid complete etag %s, partEtag %s", partETag, entryETag)
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedEtagInvalid).Inc()
}
if len(entry.Chunks) == 0 && partNumber != maxPartNo {
glog.Warningf("completeMultipartUpload %s empty chunks", entry.Name)
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartEmpty).Inc()
continue
}
//there maybe multi same part, because of client retry
partEntries[partNumber] = append(partEntries[partNumber], entry)
foundEntry = true
}
if foundEntry {
if !entityWithTtl && entry.Attributes != nil && entry.Attributes.TtlSec > 0 {
entityWithTtl = true
}
if len(completedPartNumbers) > 1 && partNumber != completedPartNumbers[len(completedPartNumbers)-1] &&
entry.Attributes.FileSize < multiPartMinSize {
glog.Warningf("completeMultipartUpload %s part file size less 5mb", entry.Name)
entityTooSmall = true
}
} else {
deleteEntries = append(deleteEntries, entry)
}
}
if entityTooSmall {
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompleteEntityTooSmall).Inc()
return nil, s3err.ErrEntityTooSmall
}
mime := pentry.Attributes.Mime
var finalParts []*filer_pb.FileChunk
var offset int64
// Track part boundaries for later retrieval with PartNumber parameter
type PartBoundary struct {
PartNumber int `json:"part"`
StartChunk int `json:"start"`
EndChunk int `json:"end"` // exclusive
ETag string `json:"etag"`
}
var partBoundaries []PartBoundary
for _, partNumber := range completedPartNumbers {
partEntriesByNumber, ok := partEntries[partNumber]
if !ok {
glog.Errorf("part %d has no entry", partNumber)
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartNotFound).Inc()
return nil, s3err.ErrInvalidPart
}
found := false
if len(partEntriesByNumber) > 1 {
sortEntriesByLatestChunk(partEntriesByNumber)
}
for _, entry := range partEntriesByNumber {
if found {
deleteEntries = append(deleteEntries, entry)
stats.S3HandlerCounter.WithLabelValues(stats.ErrorCompletedPartEntryMismatch).Inc()
continue
}
// Record the start chunk index for this part
partStartChunk := len(finalParts)
// Calculate the part's ETag (for GetObject with PartNumber)
partETag := filer.ETag(entry)
for _, chunk := range entry.GetChunks() {
// CRITICAL: Do NOT modify SSE metadata offsets during assembly!
// The encrypted data was created with the offset stored in chunk.SseMetadata.
// Changing the offset here would cause decryption to fail because CTR mode
// uses the offset to initialize the counter. We must decrypt with the same
// offset that was used during encryption.
p := &filer_pb.FileChunk{
FileId: chunk.GetFileIdString(),
Offset: offset,
Size: chunk.Size,
ModifiedTsNs: chunk.ModifiedTsNs,
CipherKey: chunk.CipherKey,
ETag: chunk.ETag,
IsCompressed: chunk.IsCompressed,
// Preserve SSE metadata UNCHANGED - do not modify the offset!
SseType: chunk.SseType,
SseMetadata: chunk.SseMetadata,
}
finalParts = append(finalParts, p)
offset += int64(chunk.Size)
}
// Record the part boundary
partEndChunk := len(finalParts)
partBoundaries = append(partBoundaries, PartBoundary{
PartNumber: partNumber,
StartChunk: partStartChunk,
EndChunk: partEndChunk,
ETag: partETag,
})
found = true
}
}
entryName, dirName := s3a.getEntryNameAndDir(input)
// Precompute ETag once for consistency across all paths
multipartETag := calculateMultipartETag(partEntries, completedPartNumbers)
etagQuote := "\"" + multipartETag + "\""
// Check if versioning is configured for this bucket BEFORE creating any files
versioningState, vErr := s3a.getVersioningState(*input.Bucket)
if vErr == nil && versioningState == s3_constants.VersioningEnabled {
// Use full object key (not just entryName) to ensure correct .versions directory is checked
normalizedKey := strings.TrimPrefix(*input.Key, "/")
useInvertedFormat := s3a.getVersionIdFormat(*input.Bucket, normalizedKey)
versionId := generateVersionId(useInvertedFormat)
versionFileName := s3a.getVersionFileName(versionId)
versionDir := dirName + "/" + entryName + s3_constants.VersionsFolder
// Capture timestamp and owner once for consistency between version entry and cache entry
versionMtime := time.Now().Unix()
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
// Create the version file in the .versions directory
err = s3a.mkFile(versionDir, versionFileName, finalParts, func(versionEntry *filer_pb.Entry) {
if versionEntry.Extended == nil {
versionEntry.Extended = make(map[string][]byte)
}
versionEntry.Extended[s3_constants.ExtVersionIdKey] = []byte(versionId)
versionEntry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId)
// Store parts count for x-amz-mp-parts-count header
versionEntry.Extended[s3_constants.SeaweedFSMultipartPartsCount] = []byte(fmt.Sprintf("%d", len(completedPartNumbers)))
// Store part boundaries for GetObject with PartNumber
if partBoundariesJSON, err := json.Marshal(partBoundaries); err == nil {
versionEntry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries] = partBoundariesJSON
}
// Set object owner for versioned multipart objects
if amzAccountId != "" {
versionEntry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
}
for k, v := range pentry.Extended {
if k != s3_constants.ExtMultipartObjectKey {
versionEntry.Extended[k] = v
}
}
// Persist ETag to ensure subsequent HEAD/GET uses the same value
versionEntry.Extended[s3_constants.ExtETagKey] = []byte(multipartETag)
// Preserve ALL SSE metadata from the first part (if any)
// SSE metadata is stored in individual parts, not the upload directory
if len(completedPartNumbers) > 0 && len(partEntries[completedPartNumbers[0]]) > 0 {
firstPartEntry := partEntries[completedPartNumbers[0]][0]
copySSEHeadersFromFirstPart(versionEntry, firstPartEntry, "versioned")
}
if pentry.Attributes.Mime != "" {
versionEntry.Attributes.Mime = pentry.Attributes.Mime
} else if mime != "" {
versionEntry.Attributes.Mime = mime
}
versionEntry.Attributes.FileSize = uint64(offset)
versionEntry.Attributes.Mtime = versionMtime
})
if err != nil {
glog.Errorf("completeMultipartUpload: failed to create version %s: %v", versionId, err)
return nil, s3err.ErrInternalError
}
// Construct entry with metadata for caching in .versions directory
// Reuse versionMtime to keep list vs. HEAD timestamps aligned
// multipartETag is precomputed
versionEntryForCache := &filer_pb.Entry{
Attributes: &filer_pb.FuseAttributes{
FileSize: uint64(offset),
Mtime: versionMtime,
},
Extended: map[string][]byte{
s3_constants.ExtETagKey: []byte(multipartETag),
},
}
if amzAccountId != "" {
versionEntryForCache.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
}
// Update the .versions directory metadata to indicate this is the latest version
// Pass entry to cache its metadata for single-scan list efficiency
err = s3a.updateLatestVersionInDirectory(*input.Bucket, *input.Key, versionId, versionFileName, versionEntryForCache)
if err != nil {
glog.Errorf("completeMultipartUpload: failed to update latest version in directory: %v", err)
return nil, s3err.ErrInternalError
}
// For versioned buckets, all content is stored in .versions directory
// The latest version information is tracked in the .versions directory metadata
output = &CompleteMultipartUploadResult{
Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))),
Bucket: input.Bucket,
ETag: aws.String(etagQuote),
Key: objectKey(input.Key),
VersionId: aws.String(versionId),
}
} else if vErr == nil && versioningState == s3_constants.VersioningSuspended {
// For suspended versioning, add "null" version ID metadata and return "null" version ID
err = s3a.mkFile(dirName, entryName, finalParts, func(entry *filer_pb.Entry) {
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
entry.Extended[s3_constants.ExtVersionIdKey] = []byte("null")
// Store parts count for x-amz-mp-parts-count header
entry.Extended[s3_constants.SeaweedFSMultipartPartsCount] = []byte(fmt.Sprintf("%d", len(completedPartNumbers)))
// Store part boundaries for GetObject with PartNumber
if partBoundariesJSON, jsonErr := json.Marshal(partBoundaries); jsonErr == nil {
entry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries] = partBoundariesJSON
}
// Set object owner for suspended versioning multipart objects
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
if amzAccountId != "" {
entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
}
for k, v := range pentry.Extended {
if k != s3_constants.ExtMultipartObjectKey {
entry.Extended[k] = v
}
}
// Preserve ALL SSE metadata from the first part (if any)
// SSE metadata is stored in individual parts, not the upload directory
if len(completedPartNumbers) > 0 && len(partEntries[completedPartNumbers[0]]) > 0 {
firstPartEntry := partEntries[completedPartNumbers[0]][0]
copySSEHeadersFromFirstPart(entry, firstPartEntry, "suspended versioning")
}
// Persist ETag to ensure subsequent HEAD/GET uses the same value
entry.Extended[s3_constants.ExtETagKey] = []byte(multipartETag)
if pentry.Attributes.Mime != "" {
entry.Attributes.Mime = pentry.Attributes.Mime
} else if mime != "" {
entry.Attributes.Mime = mime
}
entry.Attributes.FileSize = uint64(offset)
})
if err != nil {
glog.Errorf("completeMultipartUpload: failed to create suspended versioning object: %v", err)
return nil, s3err.ErrInternalError
}
// Note: Suspended versioning should NOT return VersionId field according to AWS S3 spec
output = &CompleteMultipartUploadResult{
Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))),
Bucket: input.Bucket,
ETag: aws.String(etagQuote),
Key: objectKey(input.Key),
// VersionId field intentionally omitted for suspended versioning
}
} else {
// For non-versioned buckets, create main object file
err = s3a.mkFile(dirName, entryName, finalParts, func(entry *filer_pb.Entry) {
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
entry.Extended[s3_constants.SeaweedFSUploadId] = []byte(*input.UploadId)
// Store parts count for x-amz-mp-parts-count header
entry.Extended[s3_constants.SeaweedFSMultipartPartsCount] = []byte(fmt.Sprintf("%d", len(completedPartNumbers)))
// Store part boundaries for GetObject with PartNumber
if partBoundariesJSON, err := json.Marshal(partBoundaries); err == nil {
entry.Extended[s3_constants.SeaweedFSMultipartPartBoundaries] = partBoundariesJSON
}
// Set object owner for non-versioned multipart objects
amzAccountId := r.Header.Get(s3_constants.AmzAccountId)
if amzAccountId != "" {
entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(amzAccountId)
}
for k, v := range pentry.Extended {
if k != s3_constants.ExtMultipartObjectKey {
entry.Extended[k] = v
}
}
// Preserve ALL SSE metadata from the first part (if any)
// SSE metadata is stored in individual parts, not the upload directory
if len(completedPartNumbers) > 0 && len(partEntries[completedPartNumbers[0]]) > 0 {
firstPartEntry := partEntries[completedPartNumbers[0]][0]
copySSEHeadersFromFirstPart(entry, firstPartEntry, "non-versioned")
}
// Persist ETag to ensure subsequent HEAD/GET uses the same value
entry.Extended[s3_constants.ExtETagKey] = []byte(multipartETag)
if pentry.Attributes.Mime != "" {
entry.Attributes.Mime = pentry.Attributes.Mime
} else if mime != "" {
entry.Attributes.Mime = mime
}
entry.Attributes.FileSize = uint64(offset)
// Set TTL-based S3 expiry (modification time)
if entityWithTtl {
entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true")
}
})
if err != nil {
glog.Errorf("completeMultipartUpload %s/%s error: %v", dirName, entryName, err)
return nil, s3err.ErrInternalError
}
// For non-versioned buckets, return response without VersionId
output = &CompleteMultipartUploadResult{
Location: aws.String(fmt.Sprintf("%s://%s/%s/%s", getRequestScheme(r), r.Host, url.PathEscape(*input.Bucket), urlPathEscape(*input.Key))),
Bucket: input.Bucket,
ETag: aws.String(etagQuote),
Key: objectKey(input.Key),
}
}
for _, deleteEntry := range deleteEntries {
//delete unused part data
if err = s3a.rm(uploadDirectory, deleteEntry.Name, true, true); err != nil {
glog.Warningf("completeMultipartUpload cleanup %s upload %s unused %s : %v", *input.Bucket, *input.UploadId, deleteEntry.Name, err)
}
}
if err = s3a.rm(s3a.genUploadsFolder(*input.Bucket), *input.UploadId, false, true); err != nil {
glog.V(1).Infof("completeMultipartUpload cleanup %s upload %s: %v", *input.Bucket, *input.UploadId, err)
}
return
}
func (s3a *S3ApiServer) getEntryNameAndDir(input *s3.CompleteMultipartUploadInput) (string, string) {
entryName := path.Base(*input.Key)
dirName := path.Dir(*input.Key)
if dirName == "." {
dirName = ""
}
dirName = strings.TrimPrefix(dirName, "/")
dirName = fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, *input.Bucket, dirName)
// remove suffix '/'
dirName = strings.TrimSuffix(dirName, "/")
return entryName, dirName
}
func parsePartNumber(fileName string) (int, error) {
var partNumberString string
index := strings.Index(fileName, "_")
if index != -1 {
partNumberString = fileName[:index]
} else {
partNumberString = fileName[:len(fileName)-len(multipartExt)]
}
return strconv.Atoi(partNumberString)
}
func (s3a *S3ApiServer) abortMultipartUpload(input *s3.AbortMultipartUploadInput) (output *s3.AbortMultipartUploadOutput, code s3err.ErrorCode) {
glog.V(2).Infof("abortMultipartUpload input %v", input)
exists, err := s3a.exists(s3a.genUploadsFolder(*input.Bucket), *input.UploadId, true)
if err != nil {
glog.V(1).Infof("bucket %s abort upload %s: %v", *input.Bucket, *input.UploadId, err)
return nil, s3err.ErrNoSuchUpload
}
if exists {
err = s3a.rm(s3a.genUploadsFolder(*input.Bucket), *input.UploadId, true, true)
}
if err != nil {
glog.V(1).Infof("bucket %s remove upload %s: %v", *input.Bucket, *input.UploadId, err)
return nil, s3err.ErrInternalError
}
return &s3.AbortMultipartUploadOutput{}, s3err.ErrNone
}
type ListMultipartUploadsResult struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListMultipartUploadsResult"`
// copied from s3.ListMultipartUploadsOutput, the Uploads is not converting to <Upload></Upload>
Bucket *string `type:"string"`
Delimiter *string `type:"string"`
EncodingType *string `type:"string" enum:"EncodingType"`
IsTruncated *bool `type:"boolean"`
KeyMarker *string `type:"string"`
MaxUploads *int64 `type:"integer"`
NextKeyMarker *string `type:"string"`
NextUploadIdMarker *string `type:"string"`
Prefix *string `type:"string"`
UploadIdMarker *string `type:"string"`
Upload []*s3.MultipartUpload `locationName:"Upload" type:"list" flattened:"true"`
}
func (s3a *S3ApiServer) listMultipartUploads(input *s3.ListMultipartUploadsInput) (output *ListMultipartUploadsResult, code s3err.ErrorCode) {
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListMultipartUploads.html
glog.V(2).Infof("listMultipartUploads input %v", input)
output = &ListMultipartUploadsResult{
Bucket: input.Bucket,
Delimiter: input.Delimiter,
EncodingType: input.EncodingType,
KeyMarker: input.KeyMarker,
MaxUploads: input.MaxUploads,
Prefix: input.Prefix,
IsTruncated: aws.Bool(false),
}
entries, _, err := s3a.list(s3a.genUploadsFolder(*input.Bucket), "", *input.UploadIdMarker, false, math.MaxInt32)
if err != nil {
glog.Errorf("listMultipartUploads %s error: %v", *input.Bucket, err)
return
}
uploadsCount := int64(0)
for _, entry := range entries {
if entry.Extended != nil {
key := string(entry.Extended[s3_constants.ExtMultipartObjectKey])
if *input.KeyMarker != "" && *input.KeyMarker != key {
continue
}
if *input.Prefix != "" && !strings.HasPrefix(key, *input.Prefix) {
continue
}
output.Upload = append(output.Upload, &s3.MultipartUpload{
Key: objectKey(aws.String(key)),
UploadId: aws.String(entry.Name),
})
uploadsCount += 1
}
if uploadsCount >= *input.MaxUploads {
output.IsTruncated = aws.Bool(true)
output.NextUploadIdMarker = aws.String(entry.Name)
break
}
}
return
}
type ListPartsResult struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListPartsResult"`
// copied from s3.ListPartsOutput, the Parts is not converting to <Part></Part>
Bucket *string `type:"string"`
IsTruncated *bool `type:"boolean"`
Key *string `min:"1" type:"string"`
MaxParts *int64 `type:"integer"`
NextPartNumberMarker *int64 `type:"integer"`
PartNumberMarker *int64 `type:"integer"`
Part []*s3.Part `locationName:"Part" type:"list" flattened:"true"`
StorageClass *string `type:"string" enum:"StorageClass"`
UploadId *string `type:"string"`
}
func (s3a *S3ApiServer) listObjectParts(input *s3.ListPartsInput) (output *ListPartsResult, code s3err.ErrorCode) {
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListParts.html
glog.V(2).Infof("listObjectParts input %v", input)
output = &ListPartsResult{
Bucket: input.Bucket,
Key: objectKey(input.Key),
UploadId: input.UploadId,
MaxParts: input.MaxParts, // the maximum number of parts to return.
PartNumberMarker: input.PartNumberMarker, // the part number starts after this, exclusive
StorageClass: aws.String("STANDARD"),
}
entries, isLast, err := s3a.list(s3a.genUploadsFolder(*input.Bucket)+"/"+*input.UploadId, "", fmt.Sprintf("%04d%s", *input.PartNumberMarker, multipartExt), false, uint32(*input.MaxParts))
if err != nil {
glog.Errorf("listObjectParts %s %s error: %v", *input.Bucket, *input.UploadId, err)
return nil, s3err.ErrNoSuchUpload
}
// Note: The upload directory is sort of a marker of the existence of an multipart upload request.
// So can not just delete empty upload folders.
output.IsTruncated = aws.Bool(!isLast)
for _, entry := range entries {
if strings.HasSuffix(entry.Name, multipartExt) && !entry.IsDirectory {
partNumber, err := parsePartNumber(entry.Name)
if err != nil {
glog.Errorf("listObjectParts %s %s parse %s: %v", *input.Bucket, *input.UploadId, entry.Name, err)
continue
}
partETag := filer.ETag(entry)
part := &s3.Part{
PartNumber: aws.Int64(int64(partNumber)),
LastModified: aws.Time(time.Unix(entry.Attributes.Mtime, 0).UTC()),
Size: aws.Int64(int64(filer.FileSize(entry))),
ETag: aws.String("\"" + partETag + "\""),
}
output.Part = append(output.Part, part)
glog.V(3).Infof("listObjectParts: Added part %d, size=%d, etag=%s",
partNumber, filer.FileSize(entry), partETag)
if !isLast {
output.NextPartNumberMarker = aws.Int64(int64(partNumber))
}
}
}
glog.V(2).Infof("listObjectParts: Returning %d parts for uploadId=%s", len(output.Part), *input.UploadId)
return
}
// maxInt returns the maximum of two int values
func maxInt(a, b int) int {
if a > b {
return a
}
return b
}
// MultipartEncryptionConfig holds pre-prepared encryption configuration to avoid error handling in callbacks
type MultipartEncryptionConfig struct {
// SSE-KMS configuration
IsSSEKMS bool
KMSKeyID string
BucketKeyEnabled bool
EncryptionContext string
KMSBaseIVEncoded string
// SSE-S3 configuration
IsSSES3 bool
S3BaseIVEncoded string
S3KeyDataEncoded string
}
// prepareMultipartEncryptionConfig prepares encryption configuration with proper error handling
// This eliminates the need for criticalError variable in callback functions
// Updated to support bucket-default encryption (matches putToFiler behavior)
func (s3a *S3ApiServer) prepareMultipartEncryptionConfig(r *http.Request, bucket string, uploadIdString string) (*MultipartEncryptionConfig, error) {
config := &MultipartEncryptionConfig{}
// Check for explicit encryption headers first (priority over bucket defaults)
hasExplicitSSEKMS := IsSSEKMSRequest(r)
hasExplicitSSES3 := IsSSES3RequestInternal(r)
// Prepare SSE-KMS configuration (explicit request headers)
if hasExplicitSSEKMS {
config.IsSSEKMS = true
config.KMSKeyID = r.Header.Get(s3_constants.AmzServerSideEncryptionAwsKmsKeyId)
config.BucketKeyEnabled = strings.ToLower(r.Header.Get(s3_constants.AmzServerSideEncryptionBucketKeyEnabled)) == "true"
config.EncryptionContext = r.Header.Get(s3_constants.AmzServerSideEncryptionContext)
// Generate and encode base IV with proper error handling
baseIV := make([]byte, s3_constants.AESBlockSize)
n, err := rand.Read(baseIV)
if err != nil || n != len(baseIV) {
return nil, fmt.Errorf("failed to generate secure IV for SSE-KMS multipart upload: %v (read %d/%d bytes)", err, n, len(baseIV))
}
config.KMSBaseIVEncoded = base64.StdEncoding.EncodeToString(baseIV)
glog.V(4).Infof("Generated base IV %x for explicit SSE-KMS multipart upload %s", baseIV[:8], uploadIdString)
}
// Prepare SSE-S3 configuration (explicit request headers)
if hasExplicitSSES3 {
config.IsSSES3 = true
// Generate and encode base IV with proper error handling
baseIV := make([]byte, s3_constants.AESBlockSize)
n, err := rand.Read(baseIV)
if err != nil || n != len(baseIV) {
return nil, fmt.Errorf("failed to generate secure IV for SSE-S3 multipart upload: %v (read %d/%d bytes)", err, n, len(baseIV))
}
config.S3BaseIVEncoded = base64.StdEncoding.EncodeToString(baseIV)
glog.V(4).Infof("Generated base IV %x for explicit SSE-S3 multipart upload %s", baseIV[:8], uploadIdString)
// Generate and serialize SSE-S3 key with proper error handling
keyManager := GetSSES3KeyManager()
sseS3Key, err := keyManager.GetOrCreateKey("")
if err != nil {
return nil, fmt.Errorf("failed to generate SSE-S3 key for multipart upload: %v", err)
}
keyData, serErr := SerializeSSES3Metadata(sseS3Key)
if serErr != nil {
return nil, fmt.Errorf("failed to serialize SSE-S3 metadata for multipart upload: %v", serErr)
}
config.S3KeyDataEncoded = base64.StdEncoding.EncodeToString(keyData)
// Store key in manager for later retrieval
keyManager.StoreKey(sseS3Key)
glog.V(4).Infof("Stored SSE-S3 key %s for explicit multipart upload %s", sseS3Key.KeyID, uploadIdString)
}
// If no explicit encryption headers, check bucket-default encryption
// This matches AWS S3 behavior and putToFiler() implementation
if !hasExplicitSSEKMS && !hasExplicitSSES3 {
encryptionConfig, err := s3a.GetBucketEncryptionConfig(bucket)
if err != nil {
// Check if this is just "no encryption configured" vs a real error
if !errors.Is(err, ErrNoEncryptionConfig) {
// Real error - propagate to prevent silent encryption bypass
return nil, fmt.Errorf("failed to read bucket encryption config for multipart upload: %v", err)
}
// No default encryption configured, continue without encryption
} else if encryptionConfig != nil && encryptionConfig.SseAlgorithm != "" {
glog.V(3).Infof("prepareMultipartEncryptionConfig: applying bucket-default encryption %s for bucket %s, upload %s",
encryptionConfig.SseAlgorithm, bucket, uploadIdString)
switch encryptionConfig.SseAlgorithm {
case EncryptionTypeKMS:
// Apply SSE-KMS as bucket default
config.IsSSEKMS = true
config.KMSKeyID = encryptionConfig.KmsKeyId
config.BucketKeyEnabled = encryptionConfig.BucketKeyEnabled
// No encryption context for bucket defaults
// Generate and encode base IV
baseIV := make([]byte, s3_constants.AESBlockSize)
n, readErr := rand.Read(baseIV)
if readErr != nil || n != len(baseIV) {
return nil, fmt.Errorf("failed to generate secure IV for bucket-default SSE-KMS multipart upload: %v (read %d/%d bytes)", readErr, n, len(baseIV))
}
config.KMSBaseIVEncoded = base64.StdEncoding.EncodeToString(baseIV)
glog.V(4).Infof("Generated base IV %x for bucket-default SSE-KMS multipart upload %s", baseIV[:8], uploadIdString)
case EncryptionTypeAES256:
// Apply SSE-S3 (AES256) as bucket default
config.IsSSES3 = true
// Generate and encode base IV
baseIV := make([]byte, s3_constants.AESBlockSize)
n, readErr := rand.Read(baseIV)
if readErr != nil || n != len(baseIV) {
return nil, fmt.Errorf("failed to generate secure IV for bucket-default SSE-S3 multipart upload: %v (read %d/%d bytes)", readErr, n, len(baseIV))
}
config.S3BaseIVEncoded = base64.StdEncoding.EncodeToString(baseIV)
glog.V(4).Infof("Generated base IV %x for bucket-default SSE-S3 multipart upload %s", baseIV[:8], uploadIdString)
// Generate and serialize SSE-S3 key
keyManager := GetSSES3KeyManager()
sseS3Key, keyErr := keyManager.GetOrCreateKey("")
if keyErr != nil {
return nil, fmt.Errorf("failed to generate SSE-S3 key for bucket-default multipart upload: %v", keyErr)
}
keyData, serErr := SerializeSSES3Metadata(sseS3Key)
if serErr != nil {
return nil, fmt.Errorf("failed to serialize SSE-S3 metadata for bucket-default multipart upload: %v", serErr)
}
config.S3KeyDataEncoded = base64.StdEncoding.EncodeToString(keyData)
// Store key in manager for later retrieval
keyManager.StoreKey(sseS3Key)
glog.V(4).Infof("Stored SSE-S3 key %s for bucket-default multipart upload %s", sseS3Key.KeyID, uploadIdString)
default:
glog.V(3).Infof("prepareMultipartEncryptionConfig: unsupported bucket-default encryption algorithm %s for bucket %s",
encryptionConfig.SseAlgorithm, bucket)
}
}
}
return config, nil
}
// applyMultipartEncryptionConfig applies pre-prepared encryption configuration to filer entry
// This function is guaranteed not to fail since all error-prone operations were done during preparation
func (s3a *S3ApiServer) applyMultipartEncryptionConfig(entry *filer_pb.Entry, config *MultipartEncryptionConfig) {
// Apply SSE-KMS configuration
if config.IsSSEKMS {
entry.Extended[s3_constants.SeaweedFSSSEKMSKeyID] = []byte(config.KMSKeyID)
if config.BucketKeyEnabled {
entry.Extended[s3_constants.SeaweedFSSSEKMSBucketKeyEnabled] = []byte("true")
}
if config.EncryptionContext != "" {
entry.Extended[s3_constants.SeaweedFSSSEKMSEncryptionContext] = []byte(config.EncryptionContext)
}
entry.Extended[s3_constants.SeaweedFSSSEKMSBaseIV] = []byte(config.KMSBaseIVEncoded)
glog.V(3).Infof("applyMultipartEncryptionConfig: applied SSE-KMS settings with keyID %s", config.KMSKeyID)
}
// Apply SSE-S3 configuration
if config.IsSSES3 {
entry.Extended[s3_constants.SeaweedFSSSES3Encryption] = []byte(s3_constants.SSEAlgorithmAES256)
entry.Extended[s3_constants.SeaweedFSSSES3BaseIV] = []byte(config.S3BaseIVEncoded)
entry.Extended[s3_constants.SeaweedFSSSES3KeyData] = []byte(config.S3KeyDataEncoded)
glog.V(3).Infof("applyMultipartEncryptionConfig: applied SSE-S3 settings")
}
}
func sortEntriesByLatestChunk(entries []*filer_pb.Entry) {
slices.SortFunc(entries, func(a, b *filer_pb.Entry) int {
var aTs, bTs int64
if len(a.Chunks) > 0 {
aTs = a.Chunks[0].ModifiedTsNs
}
if len(b.Chunks) > 0 {
bTs = b.Chunks[0].ModifiedTsNs
}
return cmp.Compare(bTs, aTs)
})
}
func calculateMultipartETag(partEntries map[int][]*filer_pb.Entry, completedPartNumbers []int) string {
var etags []byte
for _, partNumber := range completedPartNumbers {
entries, ok := partEntries[partNumber]
if !ok || len(entries) == 0 {
continue
}
if len(entries) > 1 {
sortEntriesByLatestChunk(entries)
}
entry := entries[0]
etag := getEtagFromEntry(entry)
glog.V(4).Infof("calculateMultipartETag: part %d, entry %s, getEtagFromEntry result: %s", partNumber, entry.Name, etag)
etag = strings.Trim(etag, "\"")
if before, _, found := strings.Cut(etag, "-"); found {
etag = before
}
if etagBytes, err := hex.DecodeString(etag); err == nil {
etags = append(etags, etagBytes...)
} else {
glog.Warningf("calculateMultipartETag: failed to decode etag '%s' for part %d: %v", etag, partNumber, err)
}
}
return fmt.Sprintf("%x-%d", md5.Sum(etags), len(completedPartNumbers))
}
func getEtagFromEntry(entry *filer_pb.Entry) string {
if entry.Extended != nil {
if etagBytes, ok := entry.Extended[s3_constants.ExtETagKey]; ok {
etag := string(etagBytes)
if len(etag) > 0 {
if !strings.HasPrefix(etag, "\"") {
return "\"" + etag + "\""
}
return etag
}
// Empty stored ETag — fall through to filer.ETag calculation
}
}
// Fallback to filer.ETag which handles Attributes.Md5 consistently
etag := filer.ETag(entry)
entryName := entry.Name
if entryName == "" {
entryName = "entry"
}
glog.V(4).Infof("getEtagFromEntry: fallback to filer.ETag for %s: %s, chunkCount: %d", entryName, etag, len(entry.Chunks))
return "\"" + etag + "\""
}