s3api: ensure MD5 is calculated or reused during CopyObject (#8163)
* s3api: ensure MD5 is calculated or reused during CopyObject Fixes #8155 - Capture and reuse source MD5 for direct copies - Calculate MD5 for small inline objects during copy * s3api: refactor encryption logic and safe MD5 copying - Extract duplicated bucket default encryption logic into helper - Use safe append copy for MD5 slice to avoid shared modifications * refactor * avoids unnecessary MD5 recalculations for small files
This commit is contained in:
@@ -155,6 +155,20 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Determine whether we can reuse the source MD5 (direct copy without encryption changes).
|
||||||
|
canReuseSourceMd5 := false
|
||||||
|
var sourceMd5 []byte
|
||||||
|
if entry.Attributes != nil && len(entry.Attributes.Md5) > 0 {
|
||||||
|
sourceMd5 = append([]byte(nil), entry.Attributes.Md5...)
|
||||||
|
srcPath := fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, srcBucket, srcObject)
|
||||||
|
dstPath := fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, dstBucket, dstObject)
|
||||||
|
state := DetectEncryptionStateWithEntry(entry, r, srcPath, dstPath)
|
||||||
|
s3a.applyCopyBucketDefaultEncryption(state, dstBucket)
|
||||||
|
if strategy, err := DetermineUnifiedCopyStrategy(state, entry.Extended, r); err == nil && strategy == CopyStrategyDirect {
|
||||||
|
canReuseSourceMd5 = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Create new entry for destination
|
// Create new entry for destination
|
||||||
dstEntry := &filer_pb.Entry{
|
dstEntry := &filer_pb.Entry{
|
||||||
Attributes: &filer_pb.FuseAttributes{
|
Attributes: &filer_pb.FuseAttributes{
|
||||||
@@ -237,9 +251,17 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if dstEntry.Attributes != nil {
|
||||||
|
if len(dstEntry.Attributes.Md5) == 0 && canReuseSourceMd5 {
|
||||||
|
dstEntry.Attributes.Md5 = append([]byte(nil), sourceMd5...)
|
||||||
|
} else if uint64(len(dstEntry.Content)) == dstEntry.Attributes.FileSize {
|
||||||
|
dstEntry.Attributes.Md5 = util.Md5(dstEntry.Content)
|
||||||
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// Use unified copy strategy approach
|
// Use unified copy strategy approach
|
||||||
dstChunks, dstMetadata, copyErr := s3a.executeUnifiedCopyStrategy(entry, r, dstBucket, srcObject, dstObject)
|
dstChunks, dstMetadata, copyErr := s3a.executeUnifiedCopyStrategy(entry, r, srcBucket, dstBucket, srcObject, dstObject)
|
||||||
if copyErr != nil {
|
if copyErr != nil {
|
||||||
glog.Errorf("CopyObjectHandler unified copy error: %v", copyErr)
|
glog.Errorf("CopyObjectHandler unified copy error: %v", copyErr)
|
||||||
// Map errors to appropriate S3 errors
|
// Map errors to appropriate S3 errors
|
||||||
@@ -257,6 +279,10 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
|
|||||||
}
|
}
|
||||||
glog.V(2).Infof("Applied %d destination metadata entries for copy: %s", len(dstMetadata), r.URL.Path)
|
glog.V(2).Infof("Applied %d destination metadata entries for copy: %s", len(dstMetadata), r.URL.Path)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if dstEntry.Attributes != nil && len(dstEntry.Attributes.Md5) == 0 && canReuseSourceMd5 {
|
||||||
|
dstEntry.Attributes.Md5 = append([]byte(nil), sourceMd5...)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if destination bucket has versioning enabled
|
// Check if destination bucket has versioning enabled
|
||||||
|
|||||||
@@ -13,26 +13,16 @@ import (
|
|||||||
|
|
||||||
// executeUnifiedCopyStrategy executes the appropriate copy strategy based on encryption state
|
// executeUnifiedCopyStrategy executes the appropriate copy strategy based on encryption state
|
||||||
// Returns chunks and destination metadata that should be applied to the destination entry
|
// Returns chunks and destination metadata that should be applied to the destination entry
|
||||||
func (s3a *S3ApiServer) executeUnifiedCopyStrategy(entry *filer_pb.Entry, r *http.Request, dstBucket, srcObject, dstObject string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
|
func (s3a *S3ApiServer) executeUnifiedCopyStrategy(entry *filer_pb.Entry, r *http.Request, srcBucket, dstBucket, srcObject, dstObject string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
|
||||||
// Detect encryption state (using entry-aware detection for multipart objects)
|
// Detect encryption state (using entry-aware detection for multipart objects)
|
||||||
srcPath := fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, r.Header.Get("X-Amz-Copy-Source-Bucket"), srcObject)
|
srcPath := fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, srcBucket, srcObject)
|
||||||
dstPath := fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, dstBucket, dstObject)
|
dstPath := fmt.Sprintf("%s/%s/%s", s3a.option.BucketsPath, dstBucket, dstObject)
|
||||||
state := DetectEncryptionStateWithEntry(entry, r, srcPath, dstPath)
|
state := DetectEncryptionStateWithEntry(entry, r, srcPath, dstPath)
|
||||||
|
|
||||||
// Debug logging for encryption state
|
// Debug logging for encryption state
|
||||||
|
|
||||||
// Apply bucket default encryption if no explicit encryption specified
|
// Apply bucket default encryption if no explicit encryption specified
|
||||||
if !state.IsTargetEncrypted() {
|
s3a.applyCopyBucketDefaultEncryption(state, dstBucket)
|
||||||
bucketMetadata, err := s3a.getBucketMetadata(dstBucket)
|
|
||||||
if err == nil && bucketMetadata != nil && bucketMetadata.Encryption != nil {
|
|
||||||
switch bucketMetadata.Encryption.SseAlgorithm {
|
|
||||||
case "aws:kms":
|
|
||||||
state.DstSSEKMS = true
|
|
||||||
case "AES256":
|
|
||||||
state.DstSSES3 = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Determine copy strategy
|
// Determine copy strategy
|
||||||
strategy, err := DetermineUnifiedCopyStrategy(state, entry.Extended, r)
|
strategy, err := DetermineUnifiedCopyStrategy(state, entry.Extended, r)
|
||||||
@@ -169,3 +159,18 @@ func (s3a *S3ApiServer) executeReencryptCopy(entry *filer_pb.Entry, r *http.Requ
|
|||||||
glog.V(2).Infof("Cross-encryption copy: using unified multipart copy")
|
glog.V(2).Infof("Cross-encryption copy: using unified multipart copy")
|
||||||
return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath)
|
return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// applyCopyBucketDefaultEncryption applies the destination bucket's default encryption settings if no explicit encryption is specified
|
||||||
|
func (s3a *S3ApiServer) applyCopyBucketDefaultEncryption(state *EncryptionState, dstBucket string) {
|
||||||
|
if !state.IsTargetEncrypted() {
|
||||||
|
bucketMetadata, err := s3a.getBucketMetadata(dstBucket)
|
||||||
|
if err == nil && bucketMetadata != nil && bucketMetadata.Encryption != nil {
|
||||||
|
switch bucketMetadata.Encryption.SseAlgorithm {
|
||||||
|
case "aws:kms":
|
||||||
|
state.DstSSEKMS = true
|
||||||
|
case "AES256":
|
||||||
|
state.DstSSES3 = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user