S3 API: Add SSE-C (#7143)
* implement sse-c * fix Content-Range * adding tests * Update s3_sse_c_test.go * copy sse-c objects * adding tests * refactor * multi reader * remove extra write header call * refactor * SSE-C encrypted objects do not support HTTP Range requests * robust * fix server starts * Update Makefile * Update Makefile * ci: remove SSE-C integration tests and workflows; delete test/s3/encryption/ * s3: SSE-C MD5 must be base64 (case-sensitive); fix validation, comparisons, metadata storage; update tests * minor * base64 * Update SSE-C_IMPLEMENTATION.md Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update weed/s3api/s3api_object_handlers.go Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * Update SSE-C_IMPLEMENTATION.md Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * address comments * fix test * fix compilation --------- Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
This commit is contained in:
@@ -1,8 +1,10 @@
|
||||
package s3api
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
@@ -160,11 +162,17 @@ func (s3a *S3ApiServer) CopyObjectHandler(w http.ResponseWriter, r *http.Request
|
||||
// Just copy the entry structure without chunks for zero-size files
|
||||
dstEntry.Chunks = nil
|
||||
} else {
|
||||
// Replicate chunks for files with content
|
||||
dstChunks, err := s3a.copyChunks(entry, r.URL.Path)
|
||||
// Handle SSE-C copy with smart fast/slow path selection
|
||||
dstChunks, err := s3a.copyChunksWithSSEC(entry, r)
|
||||
if err != nil {
|
||||
glog.Errorf("CopyObjectHandler copy chunks error: %v", err)
|
||||
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
|
||||
glog.Errorf("CopyObjectHandler copy chunks with SSE-C error: %v", err)
|
||||
// Use shared error mapping helper
|
||||
errCode := MapSSECErrorToS3Error(err)
|
||||
// For copy operations, if the error is not recognized, use InternalError
|
||||
if errCode == s3err.ErrInvalidRequest {
|
||||
errCode = s3err.ErrInternalError
|
||||
}
|
||||
s3err.WriteErrorResponse(w, r, errCode)
|
||||
return
|
||||
}
|
||||
dstEntry.Chunks = dstChunks
|
||||
@@ -591,7 +599,8 @@ func processMetadataBytes(reqHeader http.Header, existing map[string][]byte, rep
|
||||
// copyChunks replicates chunks from source entry to destination entry
|
||||
func (s3a *S3ApiServer) copyChunks(entry *filer_pb.Entry, dstPath string) ([]*filer_pb.FileChunk, error) {
|
||||
dstChunks := make([]*filer_pb.FileChunk, len(entry.GetChunks()))
|
||||
executor := util.NewLimitedConcurrentExecutor(4) // Limit to 4 concurrent operations
|
||||
const defaultChunkCopyConcurrency = 4
|
||||
executor := util.NewLimitedConcurrentExecutor(defaultChunkCopyConcurrency) // Limit to configurable concurrent operations
|
||||
errChan := make(chan error, len(entry.GetChunks()))
|
||||
|
||||
for i, chunk := range entry.GetChunks() {
|
||||
@@ -777,7 +786,8 @@ func (s3a *S3ApiServer) copyChunksForRange(entry *filer_pb.Entry, startOffset, e
|
||||
|
||||
// Copy the relevant chunks using a specialized method for range copies
|
||||
dstChunks := make([]*filer_pb.FileChunk, len(relevantChunks))
|
||||
executor := util.NewLimitedConcurrentExecutor(4)
|
||||
const defaultChunkCopyConcurrency = 4
|
||||
executor := util.NewLimitedConcurrentExecutor(defaultChunkCopyConcurrency)
|
||||
errChan := make(chan error, len(relevantChunks))
|
||||
|
||||
// Create a map to track original chunks for each relevant chunk
|
||||
@@ -997,3 +1007,136 @@ func (s3a *S3ApiServer) downloadChunkData(srcUrl string, offset, size int64) ([]
|
||||
}
|
||||
return chunkData, nil
|
||||
}
|
||||
|
||||
// copyChunksWithSSEC handles SSE-C aware copying with smart fast/slow path selection
|
||||
func (s3a *S3ApiServer) copyChunksWithSSEC(entry *filer_pb.Entry, r *http.Request) ([]*filer_pb.FileChunk, error) {
|
||||
// Parse SSE-C headers
|
||||
copySourceKey, err := ParseSSECCopySourceHeaders(r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
destKey, err := ParseSSECHeaders(r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Determine copy strategy
|
||||
strategy, err := DetermineSSECCopyStrategy(entry.Extended, copySourceKey, destKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
glog.V(2).Infof("SSE-C copy strategy for %s: %v", r.URL.Path, strategy)
|
||||
|
||||
switch strategy {
|
||||
case SSECCopyDirect:
|
||||
// FAST PATH: Direct chunk copy
|
||||
glog.V(2).Infof("Using fast path: direct chunk copy for %s", r.URL.Path)
|
||||
return s3a.copyChunks(entry, r.URL.Path)
|
||||
|
||||
case SSECCopyReencrypt:
|
||||
// SLOW PATH: Decrypt and re-encrypt
|
||||
glog.V(2).Infof("Using slow path: decrypt/re-encrypt for %s", r.URL.Path)
|
||||
return s3a.copyChunksWithReencryption(entry, copySourceKey, destKey, r.URL.Path)
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown SSE-C copy strategy: %v", strategy)
|
||||
}
|
||||
}
|
||||
|
||||
// copyChunksWithReencryption handles the slow path: decrypt source and re-encrypt for destination
|
||||
func (s3a *S3ApiServer) copyChunksWithReencryption(entry *filer_pb.Entry, copySourceKey *SSECustomerKey, destKey *SSECustomerKey, dstPath string) ([]*filer_pb.FileChunk, error) {
|
||||
dstChunks := make([]*filer_pb.FileChunk, len(entry.GetChunks()))
|
||||
const defaultChunkCopyConcurrency = 4
|
||||
executor := util.NewLimitedConcurrentExecutor(defaultChunkCopyConcurrency) // Limit to configurable concurrent operations
|
||||
errChan := make(chan error, len(entry.GetChunks()))
|
||||
|
||||
for i, chunk := range entry.GetChunks() {
|
||||
chunkIndex := i
|
||||
executor.Execute(func() {
|
||||
dstChunk, err := s3a.copyChunkWithReencryption(chunk, copySourceKey, destKey, dstPath)
|
||||
if err != nil {
|
||||
errChan <- fmt.Errorf("chunk %d: %v", chunkIndex, err)
|
||||
return
|
||||
}
|
||||
dstChunks[chunkIndex] = dstChunk
|
||||
errChan <- nil
|
||||
})
|
||||
}
|
||||
|
||||
// Wait for all operations to complete and check for errors
|
||||
for i := 0; i < len(entry.GetChunks()); i++ {
|
||||
if err := <-errChan; err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return dstChunks, nil
|
||||
}
|
||||
|
||||
// copyChunkWithReencryption copies a single chunk with decrypt/re-encrypt
|
||||
func (s3a *S3ApiServer) copyChunkWithReencryption(chunk *filer_pb.FileChunk, copySourceKey *SSECustomerKey, destKey *SSECustomerKey, dstPath string) (*filer_pb.FileChunk, error) {
|
||||
// Create destination chunk
|
||||
dstChunk := s3a.createDestinationChunk(chunk, chunk.Offset, chunk.Size)
|
||||
|
||||
// Prepare chunk copy (assign new volume and get source URL)
|
||||
assignResult, srcUrl, err := s3a.prepareChunkCopy(chunk.GetFileIdString(), dstPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Set file ID on destination chunk
|
||||
if err := s3a.setChunkFileId(dstChunk, assignResult); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Download encrypted chunk data
|
||||
encryptedData, err := s3a.downloadChunkData(srcUrl, 0, int64(chunk.Size))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("download encrypted chunk data: %w", err)
|
||||
}
|
||||
|
||||
var finalData []byte
|
||||
|
||||
// Decrypt if source is encrypted
|
||||
if copySourceKey != nil {
|
||||
decryptedReader, decErr := CreateSSECDecryptedReader(bytes.NewReader(encryptedData), copySourceKey)
|
||||
if decErr != nil {
|
||||
return nil, fmt.Errorf("create decrypted reader: %w", decErr)
|
||||
}
|
||||
|
||||
decryptedData, readErr := io.ReadAll(decryptedReader)
|
||||
if readErr != nil {
|
||||
return nil, fmt.Errorf("decrypt chunk data: %w", readErr)
|
||||
}
|
||||
finalData = decryptedData
|
||||
} else {
|
||||
// Source is unencrypted
|
||||
finalData = encryptedData
|
||||
}
|
||||
|
||||
// Re-encrypt if destination should be encrypted
|
||||
if destKey != nil {
|
||||
encryptedReader, encErr := CreateSSECEncryptedReader(bytes.NewReader(finalData), destKey)
|
||||
if encErr != nil {
|
||||
return nil, fmt.Errorf("create encrypted reader: %w", encErr)
|
||||
}
|
||||
|
||||
reencryptedData, readErr := io.ReadAll(encryptedReader)
|
||||
if readErr != nil {
|
||||
return nil, fmt.Errorf("re-encrypt chunk data: %w", readErr)
|
||||
}
|
||||
finalData = reencryptedData
|
||||
|
||||
// Update chunk size to include IV
|
||||
dstChunk.Size = uint64(len(finalData))
|
||||
}
|
||||
|
||||
// Upload the processed data
|
||||
if err := s3a.uploadChunkData(finalData, assignResult); err != nil {
|
||||
return nil, fmt.Errorf("upload processed chunk data: %w", err)
|
||||
}
|
||||
|
||||
return dstChunk, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user