Fix SSE-S3 copy: preserve encryption metadata and set chunk SSE type (#7598)
* Fix SSE-S3 copy: preserve encryption metadata and set chunk SSE type Fixes GitHub #7562: Copying objects between encrypted buckets was failing. Root causes: 1. processMetadataBytes was re-adding SSE headers from source entry, undoing the encryption header filtering. Now uses dstEntry.Extended which is already filtered. 2. SSE-S3 streaming copy returned nil metadata. Now properly generates and returns SSE-S3 destination metadata (SeaweedFSSSES3Key, AES256 header) via ExecuteStreamingCopyWithMetadata. 3. Chunks created during streaming copy didn't have SseType set. Now sets SseType and per-chunk SseMetadata with chunk-specific IVs for SSE-S3, enabling proper decryption on GetObject. * Address review: make SSE-S3 metadata serialization failures fatal errors - In executeEncryptCopy: return error instead of just logging if SerializeSSES3Metadata fails - In createChunkFromData: return error if chunk SSE-S3 metadata serialization fails This ensures objects/chunks are never created without proper encryption metadata, preventing unreadable/corrupted data. * fmt * Refactor: reuse function names instead of creating WithMetadata variants - Change ExecuteStreamingCopy to return (*EncryptionSpec, error) directly - Remove ExecuteStreamingCopyWithMetadata wrapper - Change executeStreamingReencryptCopy to return (*EncryptionSpec, error) - Remove executeStreamingReencryptCopyWithMetadata wrapper - Update callers to ignore encryption spec with _ where not needed * Add TODO documenting large file SSE-S3 copy limitation The streaming copy approach encrypts the entire stream with a single IV but stores data in chunks with per-chunk IVs. This causes decryption issues for large files. Small inline files work correctly. This is a known architectural issue that needs separate work to fix. * Use chunk-by-chunk encryption for SSE-S3 copy (consistent with SSE-C/SSE-KMS) Instead of streaming encryption (which had IV mismatch issues for multi-chunk files), SSE-S3 now uses the same chunk-by-chunk approach as SSE-C and SSE-KMS: 1. Extended copyMultipartCrossEncryption to handle SSE-S3: - Added SSE-S3 source decryption in copyCrossEncryptionChunk - Added SSE-S3 destination encryption with per-chunk IVs - Added object-level metadata generation for SSE-S3 destinations 2. Updated routing in executeEncryptCopy/executeDecryptCopy/executeReencryptCopy to use copyMultipartCrossEncryption for all SSE-S3 scenarios 3. Removed streaming copy functions (shouldUseStreamingCopy, executeStreamingReencryptCopy) as they're no longer used 4. Added large file (1MB) integration test to verify chunk-by-chunk copy works This ensures consistent behavior across all SSE types and fixes data corruption that occurred with large files in the streaming copy approach. * fmt * fmt * Address review: fail explicitly if SSE-S3 metadata is missing Instead of silently ignoring missing SSE-S3 metadata (which could create unreadable objects), now explicitly fail the copy operation with a clear error message if: - First chunk is missing - First chunk doesn't have SSE-S3 type - First chunk has empty SSE metadata - Deserialization fails * Address review: improve comment to reflect full scope of chunk creation * Address review: fail explicitly if baseIV is empty for SSE-S3 chunk encryption If DestinationIV is not set when encrypting SSE-S3 chunks, the chunk would be created without SseMetadata, causing GetObject decryption to fail later. Now fails explicitly with a clear error message. Note: calculateIVWithOffset returns ([]byte, int) not ([]byte, error) - the int is a skip amount for intra-block alignment, not an error code. * Address review: handle 0-byte files in SSE-S3 copy For 0-byte files, there are no chunks to get metadata from. Generate an IV for the object-level metadata to ensure even empty files are properly marked as SSE-S3 encrypted. Also validate that we don't have a non-empty file with no chunks (which would indicate an internal error).
This commit is contained in:
@@ -2082,6 +2082,78 @@ func TestCopyToBucketDefaultEncryptedRegression(t *testing.T) {
|
||||
require.NoError(t, err, "Failed to read object")
|
||||
assertDataEqual(t, testData, data, "Data mismatch")
|
||||
})
|
||||
|
||||
t.Run("LargeFileCopyEncrypted_ToTemp_ToEncrypted", func(t *testing.T) {
|
||||
// Test with large file (1MB) to exercise chunk-by-chunk copy path
|
||||
// This verifies consistent behavior with SSE-C and SSE-KMS
|
||||
largeTestData := generateTestData(1024 * 1024) // 1MB
|
||||
objectKey := "large-file-test.bin"
|
||||
|
||||
// Step 1: Upload large object to source bucket (will be automatically encrypted)
|
||||
_, err = client.PutObject(ctx, &s3.PutObjectInput{
|
||||
Bucket: aws.String(srcBucket),
|
||||
Key: aws.String(objectKey),
|
||||
Body: bytes.NewReader(largeTestData),
|
||||
})
|
||||
require.NoError(t, err, "Failed to upload large file to source bucket")
|
||||
|
||||
// Verify source object is encrypted
|
||||
srcHead, err := client.HeadObject(ctx, &s3.HeadObjectInput{
|
||||
Bucket: aws.String(srcBucket),
|
||||
Key: aws.String(objectKey),
|
||||
})
|
||||
require.NoError(t, err, "Failed to HEAD source object")
|
||||
assert.Equal(t, types.ServerSideEncryptionAes256, srcHead.ServerSideEncryption,
|
||||
"Source object should be SSE-S3 encrypted")
|
||||
|
||||
// Step 2: Copy to temp bucket (unencrypted) - exercises chunk-by-chunk decrypt
|
||||
_, err = client.CopyObject(ctx, &s3.CopyObjectInput{
|
||||
Bucket: aws.String(tempBucket),
|
||||
Key: aws.String(objectKey),
|
||||
CopySource: aws.String(fmt.Sprintf("%s/%s", srcBucket, objectKey)),
|
||||
})
|
||||
require.NoError(t, err, "Failed to copy large file to temp bucket")
|
||||
|
||||
// Verify temp object is unencrypted and data is correct
|
||||
tempGet, err := client.GetObject(ctx, &s3.GetObjectInput{
|
||||
Bucket: aws.String(tempBucket),
|
||||
Key: aws.String(objectKey),
|
||||
})
|
||||
require.NoError(t, err, "Failed to GET temp object")
|
||||
tempData, err := io.ReadAll(tempGet.Body)
|
||||
tempGet.Body.Close()
|
||||
require.NoError(t, err, "Failed to read temp object")
|
||||
assertDataEqual(t, largeTestData, tempData, "Temp object data mismatch after decrypt")
|
||||
|
||||
// Step 3: Copy from temp bucket to dest bucket (with default encryption)
|
||||
// This exercises chunk-by-chunk encrypt copy
|
||||
_, err = client.CopyObject(ctx, &s3.CopyObjectInput{
|
||||
Bucket: aws.String(dstBucket),
|
||||
Key: aws.String(objectKey),
|
||||
CopySource: aws.String(fmt.Sprintf("%s/%s", tempBucket, objectKey)),
|
||||
})
|
||||
require.NoError(t, err, "Failed to copy large file to destination bucket")
|
||||
|
||||
// Verify destination object is encrypted
|
||||
dstHead, err := client.HeadObject(ctx, &s3.HeadObjectInput{
|
||||
Bucket: aws.String(dstBucket),
|
||||
Key: aws.String(objectKey),
|
||||
})
|
||||
require.NoError(t, err, "Failed to HEAD destination object")
|
||||
assert.Equal(t, types.ServerSideEncryptionAes256, dstHead.ServerSideEncryption,
|
||||
"Destination object should be SSE-S3 encrypted via bucket default")
|
||||
|
||||
// Verify destination object content is correct after re-encryption
|
||||
dstGet, err := client.GetObject(ctx, &s3.GetObjectInput{
|
||||
Bucket: aws.String(dstBucket),
|
||||
Key: aws.String(objectKey),
|
||||
})
|
||||
require.NoError(t, err, "Failed to GET destination object")
|
||||
dstData, err := io.ReadAll(dstGet.Body)
|
||||
dstGet.Body.Close()
|
||||
require.NoError(t, err, "Failed to read destination object")
|
||||
assertDataEqual(t, largeTestData, dstData, "Large file data mismatch after re-encryption")
|
||||
})
|
||||
}
|
||||
|
||||
// REGRESSION TESTS FOR CRITICAL BUGS FIXED
|
||||
|
||||
@@ -199,7 +199,9 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
|
||||
}
|
||||
|
||||
// Process metadata and tags and apply to destination
|
||||
processedMetadata, tagErr := processMetadataBytes(r.Header, entry.Extended, replaceMeta, replaceTagging)
|
||||
// Use dstEntry.Extended (already filtered) as the source, not entry.Extended,
|
||||
// to preserve the encryption header filtering. Fixes GitHub #7562.
|
||||
processedMetadata, tagErr := processMetadataBytes(r.Header, dstEntry.Extended, replaceMeta, replaceTagging)
|
||||
if tagErr != nil {
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidCopySource)
|
||||
return
|
||||
@@ -1522,7 +1524,7 @@ func (s3a *S3ApiServer) copyMultipartSSECChunk(chunk *filer_pb.FileChunk, copySo
|
||||
}
|
||||
|
||||
// copyMultipartCrossEncryption handles all cross-encryption and decrypt-only copy scenarios
|
||||
// This unified function supports: SSE-C↔SSE-KMS, SSE-C→Plain, SSE-KMS→Plain
|
||||
// This unified function supports: SSE-C↔SSE-KMS↔SSE-S3, and any→Plain
|
||||
func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstBucket, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
|
||||
var dstChunks []*filer_pb.FileChunk
|
||||
|
||||
@@ -1531,6 +1533,7 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h
|
||||
var destKMSKeyID string
|
||||
var destKMSEncryptionContext map[string]string
|
||||
var destKMSBucketKeyEnabled bool
|
||||
var destSSES3Key *SSES3Key
|
||||
|
||||
if state.DstSSEC {
|
||||
var err error
|
||||
@@ -1544,7 +1547,13 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to parse destination SSE-KMS headers: %w", err)
|
||||
}
|
||||
} else {
|
||||
} else if state.DstSSES3 {
|
||||
// Generate SSE-S3 key for destination
|
||||
var err error
|
||||
destSSES3Key, err = GenerateSSES3Key()
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to generate SSE-S3 key: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Parse source encryption parameters
|
||||
@@ -1563,12 +1572,18 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h
|
||||
var err error
|
||||
|
||||
if chunk.GetSseType() == filer_pb.SSEType_SSE_C {
|
||||
copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, sourceSSECKey, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, dstPath, dstBucket, state)
|
||||
copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, sourceSSECKey, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, destSSES3Key, dstPath, dstBucket, state)
|
||||
} else if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS {
|
||||
copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, nil, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, dstPath, dstBucket, state)
|
||||
copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, nil, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, destSSES3Key, dstPath, dstBucket, state)
|
||||
} else if chunk.GetSseType() == filer_pb.SSEType_SSE_S3 {
|
||||
copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, nil, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, destSSES3Key, dstPath, dstBucket, state)
|
||||
} else {
|
||||
// Unencrypted chunk, copy directly
|
||||
copiedChunk, err = s3a.copySingleChunk(chunk, dstPath)
|
||||
// Unencrypted chunk - may need encryption if destination requires it
|
||||
if state.DstSSEC || state.DstSSEKMS || state.DstSSES3 {
|
||||
copiedChunk, err = s3a.copyCrossEncryptionChunk(chunk, nil, destSSECKey, destKMSKeyID, destKMSEncryptionContext, destKMSBucketKeyEnabled, destSSES3Key, dstPath, dstBucket, state)
|
||||
} else {
|
||||
copiedChunk, err = s3a.copySingleChunk(chunk, dstPath)
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
@@ -1619,6 +1634,40 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h
|
||||
} else {
|
||||
glog.Errorf("Failed to serialize SSE-KMS metadata: %v", serErr)
|
||||
}
|
||||
} else if state.DstSSES3 && destSSES3Key != nil {
|
||||
// For SSE-S3 destination, create object-level metadata
|
||||
var sses3Metadata *SSES3Key
|
||||
if len(dstChunks) == 0 {
|
||||
// Handle 0-byte files - generate IV for metadata even though there's no content to encrypt
|
||||
if entry.Attributes.FileSize != 0 {
|
||||
return nil, nil, fmt.Errorf("internal error: no chunks created for non-empty SSE-S3 destination object")
|
||||
}
|
||||
// Generate IV for 0-byte object metadata
|
||||
iv := make([]byte, s3_constants.AESBlockSize)
|
||||
if _, err := io.ReadFull(rand.Reader, iv); err != nil {
|
||||
return nil, nil, fmt.Errorf("generate IV for 0-byte object: %w", err)
|
||||
}
|
||||
destSSES3Key.IV = iv
|
||||
sses3Metadata = destSSES3Key
|
||||
} else {
|
||||
// For non-empty objects, use the first chunk's metadata
|
||||
if dstChunks[0].GetSseType() != filer_pb.SSEType_SSE_S3 || len(dstChunks[0].GetSseMetadata()) == 0 {
|
||||
return nil, nil, fmt.Errorf("internal error: first chunk is missing expected SSE-S3 metadata for destination object")
|
||||
}
|
||||
keyManager := GetSSES3KeyManager()
|
||||
var err error
|
||||
sses3Metadata, err = DeserializeSSES3Metadata(dstChunks[0].GetSseMetadata(), keyManager)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to deserialize SSE-S3 metadata from first chunk: %w", err)
|
||||
}
|
||||
}
|
||||
// Use the derived key with its IV for object-level metadata
|
||||
keyData, serErr := SerializeSSES3Metadata(sses3Metadata)
|
||||
if serErr != nil {
|
||||
return nil, nil, fmt.Errorf("failed to serialize SSE-S3 metadata: %w", serErr)
|
||||
}
|
||||
dstMetadata[s3_constants.SeaweedFSSSES3Key] = keyData
|
||||
dstMetadata[s3_constants.AmzServerSideEncryption] = []byte("AES256")
|
||||
}
|
||||
// For unencrypted destination, no metadata needed (dstMetadata remains empty)
|
||||
|
||||
@@ -1626,7 +1675,7 @@ func (s3a *S3ApiServer) copyMultipartCrossEncryption(entry *filer_pb.Entry, r *h
|
||||
}
|
||||
|
||||
// copyCrossEncryptionChunk handles copying a single chunk with cross-encryption support
|
||||
func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sourceSSECKey *SSECustomerKey, destSSECKey *SSECustomerKey, destKMSKeyID string, destKMSEncryptionContext map[string]string, destKMSBucketKeyEnabled bool, dstPath, dstBucket string, state *EncryptionState) (*filer_pb.FileChunk, error) {
|
||||
func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sourceSSECKey *SSECustomerKey, destSSECKey *SSECustomerKey, destKMSKeyID string, destKMSEncryptionContext map[string]string, destKMSBucketKeyEnabled bool, destSSES3Key *SSES3Key, dstPath, dstBucket string, state *EncryptionState) (*filer_pb.FileChunk, error) {
|
||||
// Create destination chunk
|
||||
dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size)
|
||||
|
||||
@@ -1726,6 +1775,30 @@ func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sour
|
||||
previewLen = len(finalData)
|
||||
}
|
||||
|
||||
} else if chunk.GetSseType() == filer_pb.SSEType_SSE_S3 {
|
||||
// Decrypt SSE-S3 source
|
||||
if len(chunk.GetSseMetadata()) == 0 {
|
||||
return nil, fmt.Errorf("SSE-S3 chunk missing per-chunk metadata")
|
||||
}
|
||||
|
||||
keyManager := GetSSES3KeyManager()
|
||||
sourceSSEKey, err := DeserializeSSES3Metadata(chunk.GetSseMetadata(), keyManager)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to deserialize SSE-S3 metadata: %w", err)
|
||||
}
|
||||
|
||||
decryptedReader, decErr := CreateSSES3DecryptedReader(bytes.NewReader(encryptedData), sourceSSEKey, sourceSSEKey.IV)
|
||||
if decErr != nil {
|
||||
return nil, fmt.Errorf("create SSE-S3 decrypted reader: %w", decErr)
|
||||
}
|
||||
|
||||
decryptedData, readErr := io.ReadAll(decryptedReader)
|
||||
if readErr != nil {
|
||||
return nil, fmt.Errorf("decrypt SSE-S3 chunk data: %w", readErr)
|
||||
}
|
||||
finalData = decryptedData
|
||||
glog.V(4).Infof("Decrypted SSE-S3 chunk, size: %d", len(finalData))
|
||||
|
||||
} else {
|
||||
// Source is unencrypted
|
||||
finalData = encryptedData
|
||||
@@ -1787,6 +1860,36 @@ func (s3a *S3ApiServer) copyCrossEncryptionChunk(chunk *filer_pb.FileChunk, sour
|
||||
dstChunk.SseMetadata = kmsMetadata
|
||||
|
||||
glog.V(4).Infof("Re-encrypted chunk with SSE-KMS")
|
||||
|
||||
} else if state.DstSSES3 && destSSES3Key != nil {
|
||||
// Encrypt with SSE-S3
|
||||
encryptedReader, iv, encErr := CreateSSES3EncryptedReader(bytes.NewReader(finalData), destSSES3Key)
|
||||
if encErr != nil {
|
||||
return nil, fmt.Errorf("create SSE-S3 encrypted reader: %w", encErr)
|
||||
}
|
||||
|
||||
reencryptedData, readErr := io.ReadAll(encryptedReader)
|
||||
if readErr != nil {
|
||||
return nil, fmt.Errorf("re-encrypt with SSE-S3: %w", readErr)
|
||||
}
|
||||
finalData = reencryptedData
|
||||
|
||||
// Create per-chunk SSE-S3 metadata with chunk-specific IV
|
||||
chunkSSEKey := &SSES3Key{
|
||||
Key: destSSES3Key.Key,
|
||||
KeyID: destSSES3Key.KeyID,
|
||||
Algorithm: destSSES3Key.Algorithm,
|
||||
IV: iv,
|
||||
}
|
||||
sses3Metadata, err := SerializeSSES3Metadata(chunkSSEKey)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("serialize SSE-S3 metadata: %w", err)
|
||||
}
|
||||
|
||||
dstChunk.SseType = filer_pb.SSEType_SSE_S3
|
||||
dstChunk.SseMetadata = sses3Metadata
|
||||
|
||||
glog.V(4).Infof("Re-encrypted chunk with SSE-S3")
|
||||
}
|
||||
// For unencrypted destination, finalData remains as decrypted plaintext
|
||||
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package s3api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
@@ -133,9 +132,9 @@ func (s3a *S3ApiServer) executeEncryptCopy(entry *filer_pb.Entry, r *http.Reques
|
||||
}
|
||||
|
||||
if state.DstSSES3 {
|
||||
// Use streaming copy for SSE-S3 encryption
|
||||
chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath)
|
||||
return chunks, nil, err
|
||||
// Use chunk-by-chunk copy for SSE-S3 encryption (consistent with SSE-C and SSE-KMS)
|
||||
glog.V(2).Infof("Plain→SSE-S3 copy: using unified multipart encrypt copy")
|
||||
return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath)
|
||||
}
|
||||
|
||||
return nil, nil, fmt.Errorf("unknown target encryption type")
|
||||
@@ -143,30 +142,18 @@ func (s3a *S3ApiServer) executeEncryptCopy(entry *filer_pb.Entry, r *http.Reques
|
||||
|
||||
// executeDecryptCopy handles encrypted → plain copies
|
||||
func (s3a *S3ApiServer) executeDecryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
|
||||
// Use unified multipart-aware decrypt copy for all encryption types
|
||||
if state.SrcSSEC || state.SrcSSEKMS {
|
||||
// Use unified multipart-aware decrypt copy for all encryption types (consistent chunk-by-chunk)
|
||||
if state.SrcSSEC || state.SrcSSEKMS || state.SrcSSES3 {
|
||||
glog.V(2).Infof("Encrypted→Plain copy: using unified multipart decrypt copy")
|
||||
return s3a.copyMultipartCrossEncryption(entry, r, state, "", dstPath)
|
||||
}
|
||||
|
||||
if state.SrcSSES3 {
|
||||
// Use streaming copy for SSE-S3 decryption
|
||||
chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath)
|
||||
return chunks, nil, err
|
||||
}
|
||||
|
||||
return nil, nil, fmt.Errorf("unknown source encryption type")
|
||||
}
|
||||
|
||||
// executeReencryptCopy handles encrypted → encrypted copies with different keys/methods
|
||||
func (s3a *S3ApiServer) executeReencryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstBucket, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
|
||||
// Check if we should use streaming copy for better performance
|
||||
if s3a.shouldUseStreamingCopy(entry, state) {
|
||||
chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath)
|
||||
return chunks, nil, err
|
||||
}
|
||||
|
||||
// Fallback to chunk-by-chunk approach for compatibility
|
||||
// Use chunk-by-chunk approach for all cross-encryption scenarios (consistent behavior)
|
||||
if state.SrcSSEC && state.DstSSEC {
|
||||
return s3a.copyChunksWithSSEC(entry, r)
|
||||
}
|
||||
@@ -177,83 +164,8 @@ func (s3a *S3ApiServer) executeReencryptCopy(entry *filer_pb.Entry, r *http.Requ
|
||||
return chunks, dstMetadata, err
|
||||
}
|
||||
|
||||
if state.SrcSSEC && state.DstSSEKMS {
|
||||
// SSE-C → SSE-KMS: use unified multipart-aware cross-encryption copy
|
||||
glog.V(2).Infof("SSE-C→SSE-KMS cross-encryption copy: using unified multipart copy")
|
||||
return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath)
|
||||
}
|
||||
|
||||
if state.SrcSSEKMS && state.DstSSEC {
|
||||
// SSE-KMS → SSE-C: use unified multipart-aware cross-encryption copy
|
||||
glog.V(2).Infof("SSE-KMS→SSE-C cross-encryption copy: using unified multipart copy")
|
||||
return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath)
|
||||
}
|
||||
|
||||
// Handle SSE-S3 cross-encryption scenarios
|
||||
if state.SrcSSES3 || state.DstSSES3 {
|
||||
// Any scenario involving SSE-S3 uses streaming copy
|
||||
chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath)
|
||||
return chunks, nil, err
|
||||
}
|
||||
|
||||
return nil, nil, fmt.Errorf("unsupported cross-encryption scenario")
|
||||
}
|
||||
|
||||
// shouldUseStreamingCopy determines if streaming copy should be used
|
||||
func (s3a *S3ApiServer) shouldUseStreamingCopy(entry *filer_pb.Entry, state *EncryptionState) bool {
|
||||
// Use streaming copy for large files or when beneficial
|
||||
fileSize := entry.Attributes.FileSize
|
||||
|
||||
// Use streaming for files larger than 10MB
|
||||
if fileSize > 10*1024*1024 {
|
||||
return true
|
||||
}
|
||||
|
||||
// Check if this is a multipart encrypted object
|
||||
isMultipartEncrypted := false
|
||||
if state.IsSourceEncrypted() {
|
||||
encryptedChunks := 0
|
||||
for _, chunk := range entry.GetChunks() {
|
||||
if chunk.GetSseType() != filer_pb.SSEType_NONE {
|
||||
encryptedChunks++
|
||||
}
|
||||
}
|
||||
isMultipartEncrypted = encryptedChunks > 1
|
||||
}
|
||||
|
||||
// For multipart encrypted objects, avoid streaming copy to use per-chunk metadata approach
|
||||
if isMultipartEncrypted {
|
||||
glog.V(3).Infof("Multipart encrypted object detected, using chunk-by-chunk approach")
|
||||
return false
|
||||
}
|
||||
|
||||
// Use streaming for cross-encryption scenarios (for single-part objects only)
|
||||
if state.IsSourceEncrypted() && state.IsTargetEncrypted() {
|
||||
srcType := s3a.getEncryptionTypeString(state.SrcSSEC, state.SrcSSEKMS, state.SrcSSES3)
|
||||
dstType := s3a.getEncryptionTypeString(state.DstSSEC, state.DstSSEKMS, state.DstSSES3)
|
||||
if srcType != dstType {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// Use streaming for compressed files
|
||||
if isCompressedEntry(entry) {
|
||||
return true
|
||||
}
|
||||
|
||||
// Use streaming for SSE-S3 scenarios (always)
|
||||
if state.SrcSSES3 || state.DstSSES3 {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// executeStreamingReencryptCopy performs streaming re-encryption copy
|
||||
func (s3a *S3ApiServer) executeStreamingReencryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstPath string) ([]*filer_pb.FileChunk, error) {
|
||||
// Create streaming copy manager
|
||||
streamingManager := NewStreamingCopyManager(s3a)
|
||||
|
||||
// Execute streaming copy
|
||||
return streamingManager.ExecuteStreamingCopy(context.Background(), entry, r, dstPath, state)
|
||||
// All other cross-encryption scenarios use unified multipart copy
|
||||
// This includes: SSE-C↔SSE-KMS, SSE-C↔SSE-S3, SSE-KMS↔SSE-S3, SSE-S3↔SSE-S3
|
||||
glog.V(2).Infof("Cross-encryption copy: using unified multipart copy")
|
||||
return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath)
|
||||
}
|
||||
|
||||
@@ -59,18 +59,19 @@ func NewStreamingCopyManager(s3a *S3ApiServer) *StreamingCopyManager {
|
||||
}
|
||||
}
|
||||
|
||||
// ExecuteStreamingCopy performs a streaming copy operation
|
||||
func (scm *StreamingCopyManager) ExecuteStreamingCopy(ctx context.Context, entry *filer_pb.Entry, r *http.Request, dstPath string, state *EncryptionState) ([]*filer_pb.FileChunk, error) {
|
||||
// ExecuteStreamingCopy performs a streaming copy operation and returns the encryption spec
|
||||
// The encryption spec is needed for SSE-S3 to properly set destination metadata (fixes GitHub #7562)
|
||||
func (scm *StreamingCopyManager) ExecuteStreamingCopy(ctx context.Context, entry *filer_pb.Entry, r *http.Request, dstPath string, state *EncryptionState) ([]*filer_pb.FileChunk, *EncryptionSpec, error) {
|
||||
// Create streaming copy specification
|
||||
spec, err := scm.createStreamingSpec(entry, r, state)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create streaming spec: %w", err)
|
||||
return nil, nil, fmt.Errorf("create streaming spec: %w", err)
|
||||
}
|
||||
|
||||
// Create source reader from entry
|
||||
sourceReader, err := scm.createSourceReader(entry)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create source reader: %w", err)
|
||||
return nil, nil, fmt.Errorf("create source reader: %w", err)
|
||||
}
|
||||
defer sourceReader.Close()
|
||||
|
||||
@@ -79,11 +80,16 @@ func (scm *StreamingCopyManager) ExecuteStreamingCopy(ctx context.Context, entry
|
||||
// Create processing pipeline
|
||||
processedReader, err := scm.createProcessingPipeline(spec)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create processing pipeline: %w", err)
|
||||
return nil, nil, fmt.Errorf("create processing pipeline: %w", err)
|
||||
}
|
||||
|
||||
// Stream to destination
|
||||
return scm.streamToDestination(ctx, processedReader, spec, dstPath)
|
||||
chunks, err := scm.streamToDestination(ctx, processedReader, spec, dstPath)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return chunks, spec.EncryptionSpec, nil
|
||||
}
|
||||
|
||||
// createStreamingSpec creates a streaming specification based on copy parameters
|
||||
@@ -453,8 +459,8 @@ func (scm *StreamingCopyManager) streamToChunks(ctx context.Context, reader io.R
|
||||
for {
|
||||
n, err := reader.Read(buffer)
|
||||
if n > 0 {
|
||||
// Create chunk for this data
|
||||
chunk, chunkErr := scm.createChunkFromData(buffer[:n], offset, dstPath)
|
||||
// Create chunk for this data, setting SSE type and per-chunk metadata (including chunk-specific IVs for SSE-S3)
|
||||
chunk, chunkErr := scm.createChunkFromData(buffer[:n], offset, dstPath, spec.EncryptionSpec)
|
||||
if chunkErr != nil {
|
||||
return nil, fmt.Errorf("create chunk from data: %w", chunkErr)
|
||||
}
|
||||
@@ -474,7 +480,7 @@ func (scm *StreamingCopyManager) streamToChunks(ctx context.Context, reader io.R
|
||||
}
|
||||
|
||||
// createChunkFromData creates a chunk from streaming data
|
||||
func (scm *StreamingCopyManager) createChunkFromData(data []byte, offset int64, dstPath string) (*filer_pb.FileChunk, error) {
|
||||
func (scm *StreamingCopyManager) createChunkFromData(data []byte, offset int64, dstPath string, encSpec *EncryptionSpec) (*filer_pb.FileChunk, error) {
|
||||
// Assign new volume
|
||||
assignResult, err := scm.s3a.assignNewVolume(dstPath)
|
||||
if err != nil {
|
||||
@@ -487,6 +493,42 @@ func (scm *StreamingCopyManager) createChunkFromData(data []byte, offset int64,
|
||||
Size: uint64(len(data)),
|
||||
}
|
||||
|
||||
// Set SSE type and metadata on chunk if destination is encrypted
|
||||
// This is critical for GetObject to know to decrypt the data - fixes GitHub #7562
|
||||
if encSpec != nil && encSpec.NeedsEncryption {
|
||||
switch encSpec.DestinationType {
|
||||
case EncryptionTypeSSEC:
|
||||
chunk.SseType = filer_pb.SSEType_SSE_C
|
||||
// SSE-C metadata is handled at object level, not per-chunk for streaming copy
|
||||
case EncryptionTypeSSEKMS:
|
||||
chunk.SseType = filer_pb.SSEType_SSE_KMS
|
||||
// SSE-KMS metadata is handled at object level, not per-chunk for streaming copy
|
||||
case EncryptionTypeSSES3:
|
||||
chunk.SseType = filer_pb.SSEType_SSE_S3
|
||||
// Create per-chunk SSE-S3 metadata with chunk-specific IV
|
||||
if sseKey, ok := encSpec.DestinationKey.(*SSES3Key); ok {
|
||||
// Calculate chunk-specific IV using base IV and chunk offset
|
||||
baseIV := encSpec.DestinationIV
|
||||
if len(baseIV) == 0 {
|
||||
return nil, fmt.Errorf("SSE-S3 encryption requires DestinationIV to be set for chunk at offset %d", offset)
|
||||
}
|
||||
chunkIV, _ := calculateIVWithOffset(baseIV, offset)
|
||||
// Create chunk key with the chunk-specific IV
|
||||
chunkSSEKey := &SSES3Key{
|
||||
Key: sseKey.Key,
|
||||
KeyID: sseKey.KeyID,
|
||||
Algorithm: sseKey.Algorithm,
|
||||
IV: chunkIV,
|
||||
}
|
||||
chunkMetadata, serErr := SerializeSSES3Metadata(chunkSSEKey)
|
||||
if serErr != nil {
|
||||
return nil, fmt.Errorf("failed to serialize chunk SSE-S3 metadata: %w", serErr)
|
||||
}
|
||||
chunk.SseMetadata = chunkMetadata
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Set file ID
|
||||
if err := scm.s3a.setChunkFileId(chunk, assignResult); err != nil {
|
||||
return nil, err
|
||||
|
||||
Reference in New Issue
Block a user