* chore: remove unreachable dead code across the codebase Remove ~50,000 lines of unreachable code identified by static analysis. Major removals: - weed/filer/redis_lua: entire unused Redis Lua filer store implementation - weed/wdclient/net2, resource_pool: unused connection/resource pool packages - weed/plugin/worker/lifecycle: unused lifecycle plugin worker - weed/s3api: unused S3 policy templates, presigned URL IAM, streaming copy, multipart IAM, key rotation, and various SSE helper functions - weed/mq/kafka: unused partition mapping, compression, schema, and protocol functions - weed/mq/offset: unused SQL storage and migration code - weed/worker: unused registry, task, and monitoring functions - weed/query: unused SQL engine, parquet scanner, and type functions - weed/shell: unused EC proportional rebalance functions - weed/storage/erasure_coding/distribution: unused distribution analysis functions - Individual unreachable functions removed from 150+ files across admin, credential, filer, iam, kms, mount, mq, operation, pb, s3api, server, shell, storage, topology, and util packages * fix(s3): reset shared memory store in IAM test to prevent flaky failure TestLoadIAMManagerFromConfig_EmptyConfigWithFallbackKey was flaky because the MemoryStore credential backend is a singleton registered via init(). Earlier tests that create anonymous identities pollute the shared store, causing LookupAnonymous() to unexpectedly return true. Fix by calling Reset() on the memory store before the test runs. * style: run gofmt on changed files * fix: restore KMS functions used by integration tests * fix(plugin): prevent panic on send to closed worker session channel The Plugin.sendToWorker method could panic with "send on closed channel" when a worker disconnected while a message was being sent. The race was between streamSession.close() closing the outgoing channel and sendToWorker writing to it concurrently. Add a done channel to streamSession that is closed before the outgoing channel, and check it in sendToWorker's select to safely detect closed sessions without panicking.
932 lines
32 KiB
Go
932 lines
32 KiB
Go
package s3api
|
|
|
|
import (
|
|
"context"
|
|
"crypto/aes"
|
|
"crypto/cipher"
|
|
"crypto/rand"
|
|
"crypto/sha256"
|
|
"encoding/base64"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"regexp"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/kms"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
|
|
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
|
|
)
|
|
|
|
// Compiled regex patterns for KMS key validation
|
|
var (
|
|
uuidRegex = regexp.MustCompile(`^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}$`)
|
|
arnRegex = regexp.MustCompile(`^arn:aws:kms:[a-z0-9-]+:\d{12}:(key|alias)/.+$`)
|
|
)
|
|
|
|
// SSEKMSKey contains the metadata for an SSE-KMS encrypted object
|
|
type SSEKMSKey struct {
|
|
KeyID string // The KMS key ID used
|
|
EncryptedDataKey []byte // The encrypted data encryption key
|
|
EncryptionContext map[string]string // The encryption context used
|
|
BucketKeyEnabled bool // Whether S3 Bucket Keys are enabled
|
|
IV []byte // The initialization vector for encryption
|
|
ChunkOffset int64 // Offset of this chunk within the original part (for IV calculation)
|
|
}
|
|
|
|
// SSEKMSMetadata represents the metadata stored with SSE-KMS objects
|
|
type SSEKMSMetadata struct {
|
|
Algorithm string `json:"algorithm"` // "aws:kms"
|
|
KeyID string `json:"keyId"` // KMS key identifier
|
|
EncryptedDataKey string `json:"encryptedDataKey"` // Base64-encoded encrypted data key
|
|
EncryptionContext map[string]string `json:"encryptionContext"` // Encryption context
|
|
BucketKeyEnabled bool `json:"bucketKeyEnabled"` // S3 Bucket Key optimization
|
|
IV string `json:"iv"` // Base64-encoded initialization vector
|
|
PartOffset int64 `json:"partOffset"` // Offset within original multipart part (for IV calculation)
|
|
}
|
|
|
|
const (
|
|
// Default data key size (256 bits)
|
|
DataKeySize = 32
|
|
)
|
|
|
|
// Bucket key cache TTL (moved to be used with per-bucket cache)
|
|
const BucketKeyCacheTTL = time.Hour
|
|
|
|
// CreateSSEKMSEncryptedReaderWithBucketKey creates an encrypted reader with optional S3 Bucket Keys optimization
|
|
func CreateSSEKMSEncryptedReaderWithBucketKey(r io.Reader, keyID string, encryptionContext map[string]string, bucketKeyEnabled bool) (io.Reader, *SSEKMSKey, error) {
|
|
if bucketKeyEnabled {
|
|
// Use S3 Bucket Keys optimization - try to get or create a bucket-level data key
|
|
// Note: This is a simplified implementation. In practice, this would need
|
|
// access to the bucket name and S3ApiServer instance for proper per-bucket caching.
|
|
// For now, generate per-object keys (bucket key optimization disabled)
|
|
glog.V(2).Infof("Bucket key optimization requested but not fully implemented yet - using per-object keys")
|
|
bucketKeyEnabled = false
|
|
}
|
|
|
|
// Generate data key using common utility
|
|
dataKeyResult, err := generateKMSDataKey(keyID, encryptionContext)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Ensure we clear the plaintext data key from memory when done
|
|
defer clearKMSDataKey(dataKeyResult)
|
|
|
|
// Generate a random IV for CTR mode
|
|
// Note: AES-CTR is used for object data encryption (not AES-GCM) because:
|
|
// 1. CTR mode supports streaming encryption for large objects
|
|
// 2. CTR mode supports range requests (seek to arbitrary positions)
|
|
// 3. This matches AWS S3 and other S3-compatible implementations
|
|
// The KMS data key encryption (separate layer) uses AES-GCM for authentication
|
|
iv := make([]byte, s3_constants.AESBlockSize)
|
|
if _, err := io.ReadFull(rand.Reader, iv); err != nil {
|
|
return nil, nil, fmt.Errorf("failed to generate IV: %v", err)
|
|
}
|
|
|
|
// Create CTR mode cipher stream
|
|
stream := cipher.NewCTR(dataKeyResult.Block, iv)
|
|
|
|
// Create the SSE-KMS metadata using utility function
|
|
sseKey := createSSEKMSKey(dataKeyResult, encryptionContext, bucketKeyEnabled, iv, 0)
|
|
|
|
// The IV is stored in SSE key metadata, so the encrypted stream does not need to prepend the IV
|
|
// This ensures correct Content-Length for clients
|
|
encryptedReader := &cipher.StreamReader{S: stream, R: r}
|
|
|
|
// Store IV in the SSE key for metadata storage
|
|
sseKey.IV = iv
|
|
|
|
return encryptedReader, sseKey, nil
|
|
}
|
|
|
|
// CreateSSEKMSEncryptedReaderWithBaseIVAndOffset creates an SSE-KMS encrypted reader using a provided base IV and offset
|
|
// This is used for multipart uploads where all chunks need unique IVs to prevent IV reuse vulnerabilities
|
|
func CreateSSEKMSEncryptedReaderWithBaseIVAndOffset(r io.Reader, keyID string, encryptionContext map[string]string, bucketKeyEnabled bool, baseIV []byte, offset int64) (io.Reader, *SSEKMSKey, error) {
|
|
if err := ValidateIV(baseIV, "base IV"); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Generate data key using common utility
|
|
dataKeyResult, err := generateKMSDataKey(keyID, encryptionContext)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// Ensure we clear the plaintext data key from memory when done
|
|
defer clearKMSDataKey(dataKeyResult)
|
|
|
|
// Calculate unique IV using base IV and offset to prevent IV reuse in multipart uploads
|
|
// Skip is not used here because we're encrypting from the start (not reading a range)
|
|
iv, _ := calculateIVWithOffset(baseIV, offset)
|
|
|
|
// Create CTR mode cipher stream
|
|
stream := cipher.NewCTR(dataKeyResult.Block, iv)
|
|
|
|
// Create the SSE-KMS metadata using utility function
|
|
sseKey := createSSEKMSKey(dataKeyResult, encryptionContext, bucketKeyEnabled, iv, offset)
|
|
|
|
// The IV is stored in SSE key metadata, so the encrypted stream does not need to prepend the IV
|
|
// This ensures correct Content-Length for clients
|
|
encryptedReader := &cipher.StreamReader{S: stream, R: r}
|
|
|
|
return encryptedReader, sseKey, nil
|
|
}
|
|
|
|
// hashEncryptionContext creates a deterministic hash of the encryption context
|
|
func hashEncryptionContext(encryptionContext map[string]string) string {
|
|
if len(encryptionContext) == 0 {
|
|
return "empty"
|
|
}
|
|
|
|
// Create a deterministic representation of the context
|
|
hash := sha256.New()
|
|
|
|
// Sort keys to ensure deterministic hash
|
|
keys := make([]string, 0, len(encryptionContext))
|
|
for k := range encryptionContext {
|
|
keys = append(keys, k)
|
|
}
|
|
|
|
sort.Strings(keys)
|
|
|
|
// Hash the sorted key-value pairs
|
|
for _, k := range keys {
|
|
hash.Write([]byte(k))
|
|
hash.Write([]byte("="))
|
|
hash.Write([]byte(encryptionContext[k]))
|
|
hash.Write([]byte(";"))
|
|
}
|
|
|
|
return hex.EncodeToString(hash.Sum(nil))[:16] // Use first 16 chars for brevity
|
|
}
|
|
|
|
// getBucketDataKey retrieves or creates a cached bucket-level data key for SSE-KMS
|
|
// This is a simplified implementation that demonstrates the per-bucket caching concept
|
|
// In a full implementation, this would integrate with the actual bucket configuration system
|
|
func getBucketDataKey(bucketName, keyID string, encryptionContext map[string]string, bucketCache *BucketKMSCache) (*kms.GenerateDataKeyResponse, error) {
|
|
// Create context hash for cache key
|
|
contextHash := hashEncryptionContext(encryptionContext)
|
|
cacheKey := fmt.Sprintf("%s:%s", keyID, contextHash)
|
|
|
|
// Try to get from cache first if cache is available
|
|
if bucketCache != nil {
|
|
if cacheEntry, found := bucketCache.Get(cacheKey); found {
|
|
if dataKey, ok := cacheEntry.DataKey.(*kms.GenerateDataKeyResponse); ok {
|
|
glog.V(3).Infof("Using cached bucket key for bucket %s, keyID %s", bucketName, keyID)
|
|
return dataKey, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// Cache miss - generate new data key
|
|
kmsProvider := kms.GetGlobalKMS()
|
|
if kmsProvider == nil {
|
|
return nil, fmt.Errorf("KMS is not configured")
|
|
}
|
|
|
|
dataKeyReq := &kms.GenerateDataKeyRequest{
|
|
KeyID: keyID,
|
|
KeySpec: kms.KeySpecAES256,
|
|
EncryptionContext: encryptionContext,
|
|
}
|
|
|
|
ctx := context.Background()
|
|
dataKeyResp, err := kmsProvider.GenerateDataKey(ctx, dataKeyReq)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to generate bucket data key: %v", err)
|
|
}
|
|
|
|
// Cache the data key for future use if cache is available
|
|
if bucketCache != nil {
|
|
bucketCache.Set(cacheKey, keyID, dataKeyResp, BucketKeyCacheTTL)
|
|
glog.V(2).Infof("Generated and cached new bucket key for bucket %s, keyID %s", bucketName, keyID)
|
|
} else {
|
|
glog.V(2).Infof("Generated new bucket key for bucket %s, keyID %s (caching disabled)", bucketName, keyID)
|
|
}
|
|
|
|
return dataKeyResp, nil
|
|
}
|
|
|
|
// CreateSSEKMSEncryptedReaderForBucket creates an encrypted reader with bucket-specific caching
|
|
// This method is part of S3ApiServer to access bucket configuration and caching
|
|
func (s3a *S3ApiServer) CreateSSEKMSEncryptedReaderForBucket(r io.Reader, bucketName, keyID string, encryptionContext map[string]string, bucketKeyEnabled bool) (io.Reader, *SSEKMSKey, error) {
|
|
var dataKeyResp *kms.GenerateDataKeyResponse
|
|
var err error
|
|
|
|
if bucketKeyEnabled {
|
|
// Use S3 Bucket Keys optimization with persistent per-bucket caching
|
|
bucketCache, err := s3a.getBucketKMSCache(bucketName)
|
|
if err != nil {
|
|
glog.V(2).Infof("Failed to get bucket KMS cache for %s, falling back to per-object key: %v", bucketName, err)
|
|
bucketKeyEnabled = false
|
|
} else {
|
|
dataKeyResp, err = getBucketDataKey(bucketName, keyID, encryptionContext, bucketCache)
|
|
if err != nil {
|
|
// Fall back to per-object key generation if bucket key fails
|
|
glog.V(2).Infof("Bucket key generation failed for bucket %s, falling back to per-object key: %v", bucketName, err)
|
|
bucketKeyEnabled = false
|
|
}
|
|
}
|
|
}
|
|
|
|
if !bucketKeyEnabled {
|
|
// Generate a per-object data encryption key using KMS
|
|
kmsProvider := kms.GetGlobalKMS()
|
|
if kmsProvider == nil {
|
|
return nil, nil, fmt.Errorf("KMS is not configured")
|
|
}
|
|
|
|
dataKeyReq := &kms.GenerateDataKeyRequest{
|
|
KeyID: keyID,
|
|
KeySpec: kms.KeySpecAES256,
|
|
EncryptionContext: encryptionContext,
|
|
}
|
|
|
|
ctx := context.Background()
|
|
dataKeyResp, err = kmsProvider.GenerateDataKey(ctx, dataKeyReq)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to generate data key: %v", err)
|
|
}
|
|
}
|
|
|
|
// Ensure we clear the plaintext data key from memory when done
|
|
defer kms.ClearSensitiveData(dataKeyResp.Plaintext)
|
|
|
|
// Create AES cipher with the data key
|
|
block, err := aes.NewCipher(dataKeyResp.Plaintext)
|
|
if err != nil {
|
|
return nil, nil, fmt.Errorf("failed to create AES cipher: %v", err)
|
|
}
|
|
|
|
// Generate a random IV for CTR mode
|
|
iv := make([]byte, 16) // AES block size
|
|
if _, err := io.ReadFull(rand.Reader, iv); err != nil {
|
|
return nil, nil, fmt.Errorf("failed to generate IV: %v", err)
|
|
}
|
|
|
|
// Create CTR mode cipher stream
|
|
stream := cipher.NewCTR(block, iv)
|
|
|
|
// Create the encrypting reader
|
|
sseKey := &SSEKMSKey{
|
|
KeyID: keyID,
|
|
EncryptedDataKey: dataKeyResp.CiphertextBlob,
|
|
EncryptionContext: encryptionContext,
|
|
BucketKeyEnabled: bucketKeyEnabled,
|
|
IV: iv,
|
|
}
|
|
|
|
return &cipher.StreamReader{S: stream, R: r}, sseKey, nil
|
|
}
|
|
|
|
// getBucketKMSCache gets or creates the persistent KMS cache for a bucket
|
|
func (s3a *S3ApiServer) getBucketKMSCache(bucketName string) (*BucketKMSCache, error) {
|
|
// Get bucket configuration
|
|
bucketConfig, errCode := s3a.getBucketConfig(bucketName)
|
|
if errCode != s3err.ErrNone {
|
|
if errCode == s3err.ErrNoSuchBucket {
|
|
return nil, fmt.Errorf("bucket %s does not exist", bucketName)
|
|
}
|
|
return nil, fmt.Errorf("failed to get bucket config: %v", errCode)
|
|
}
|
|
|
|
// Initialize KMS cache if it doesn't exist
|
|
if bucketConfig.KMSKeyCache == nil {
|
|
bucketConfig.KMSKeyCache = NewBucketKMSCache(bucketName, BucketKeyCacheTTL)
|
|
glog.V(3).Infof("Initialized new KMS cache for bucket %s", bucketName)
|
|
}
|
|
|
|
return bucketConfig.KMSKeyCache, nil
|
|
}
|
|
|
|
// CleanupBucketKMSCache performs cleanup of expired KMS keys for a specific bucket
|
|
func (s3a *S3ApiServer) CleanupBucketKMSCache(bucketName string) int {
|
|
bucketCache, err := s3a.getBucketKMSCache(bucketName)
|
|
if err != nil {
|
|
glog.V(3).Infof("Could not get KMS cache for bucket %s: %v", bucketName, err)
|
|
return 0
|
|
}
|
|
|
|
cleaned := bucketCache.CleanupExpired()
|
|
if cleaned > 0 {
|
|
glog.V(2).Infof("Cleaned up %d expired KMS keys for bucket %s", cleaned, bucketName)
|
|
}
|
|
return cleaned
|
|
}
|
|
|
|
// CleanupAllBucketKMSCaches performs cleanup of expired KMS keys for all buckets
|
|
func (s3a *S3ApiServer) CleanupAllBucketKMSCaches() int {
|
|
totalCleaned := 0
|
|
|
|
// Access the bucket config cache safely
|
|
if s3a.bucketConfigCache != nil {
|
|
s3a.bucketConfigCache.mutex.RLock()
|
|
bucketNames := make([]string, 0, len(s3a.bucketConfigCache.cache))
|
|
for bucketName := range s3a.bucketConfigCache.cache {
|
|
bucketNames = append(bucketNames, bucketName)
|
|
}
|
|
s3a.bucketConfigCache.mutex.RUnlock()
|
|
|
|
// Clean up each bucket's KMS cache
|
|
for _, bucketName := range bucketNames {
|
|
cleaned := s3a.CleanupBucketKMSCache(bucketName)
|
|
totalCleaned += cleaned
|
|
}
|
|
}
|
|
|
|
if totalCleaned > 0 {
|
|
glog.V(2).Infof("Cleaned up %d expired KMS keys across %d bucket caches", totalCleaned, len(s3a.bucketConfigCache.cache))
|
|
}
|
|
return totalCleaned
|
|
}
|
|
|
|
// CreateSSEKMSDecryptedReader creates a decrypted reader using KMS envelope encryption
|
|
func CreateSSEKMSDecryptedReader(r io.Reader, sseKey *SSEKMSKey) (io.Reader, error) {
|
|
kmsProvider := kms.GetGlobalKMS()
|
|
if kmsProvider == nil {
|
|
return nil, fmt.Errorf("KMS is not configured")
|
|
}
|
|
|
|
// Decrypt the data encryption key using KMS
|
|
decryptReq := &kms.DecryptRequest{
|
|
CiphertextBlob: sseKey.EncryptedDataKey,
|
|
EncryptionContext: sseKey.EncryptionContext,
|
|
}
|
|
|
|
ctx := context.Background()
|
|
decryptResp, err := kmsProvider.Decrypt(ctx, decryptReq)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to decrypt data key: %v", err)
|
|
}
|
|
|
|
// Ensure we clear the plaintext data key from memory when done
|
|
defer kms.ClearSensitiveData(decryptResp.Plaintext)
|
|
|
|
// Verify the key ID matches (security check)
|
|
if decryptResp.KeyID != sseKey.KeyID {
|
|
return nil, fmt.Errorf("KMS key ID mismatch: expected %s, got %s", sseKey.KeyID, decryptResp.KeyID)
|
|
}
|
|
|
|
// Use the IV from the SSE key metadata, calculating offset if this is a chunked part
|
|
if err := ValidateIV(sseKey.IV, "SSE key IV"); err != nil {
|
|
return nil, fmt.Errorf("invalid IV in SSE key: %w", err)
|
|
}
|
|
|
|
// Calculate the correct IV for this chunk's offset within the original part
|
|
// Note: The skip bytes must be discarded by the caller before reading from the returned reader
|
|
var iv []byte
|
|
if sseKey.ChunkOffset > 0 {
|
|
iv, _ = calculateIVWithOffset(sseKey.IV, sseKey.ChunkOffset)
|
|
// Skip value is ignored here; caller must handle intra-block byte skipping
|
|
} else {
|
|
iv = sseKey.IV
|
|
}
|
|
|
|
// Create AES cipher with the decrypted data key
|
|
block, err := aes.NewCipher(decryptResp.Plaintext)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create AES cipher: %v", err)
|
|
}
|
|
|
|
// Create CTR mode cipher stream for decryption
|
|
// Note: AES-CTR is used for object data decryption to match the encryption mode
|
|
stream := cipher.NewCTR(block, iv)
|
|
decryptReader := &cipher.StreamReader{S: stream, R: r}
|
|
|
|
// Wrap with closer if the underlying reader implements io.Closer
|
|
if closer, ok := r.(io.Closer); ok {
|
|
return &decryptReaderCloser{
|
|
Reader: decryptReader,
|
|
underlyingCloser: closer,
|
|
}, nil
|
|
}
|
|
|
|
// Return the decrypted reader
|
|
return decryptReader, nil
|
|
}
|
|
|
|
// BuildEncryptionContext creates the encryption context for S3 objects
|
|
func BuildEncryptionContext(bucketName, objectKey string, useBucketKey bool) map[string]string {
|
|
return kms.BuildS3EncryptionContext(bucketName, objectKey, useBucketKey)
|
|
}
|
|
|
|
// parseEncryptionContext parses the user-provided encryption context from base64 JSON
|
|
func parseEncryptionContext(contextHeader string) (map[string]string, error) {
|
|
if contextHeader == "" {
|
|
return nil, nil
|
|
}
|
|
|
|
// Decode base64
|
|
contextBytes, err := base64.StdEncoding.DecodeString(contextHeader)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("invalid base64 encoding in encryption context: %w", err)
|
|
}
|
|
|
|
// Parse JSON
|
|
var context map[string]string
|
|
if err := json.Unmarshal(contextBytes, &context); err != nil {
|
|
return nil, fmt.Errorf("invalid JSON in encryption context: %w", err)
|
|
}
|
|
|
|
// Validate context keys and values
|
|
for k, v := range context {
|
|
if k == "" || v == "" {
|
|
return nil, fmt.Errorf("encryption context keys and values cannot be empty")
|
|
}
|
|
// AWS KMS has limits on context key/value length (256 chars each)
|
|
if len(k) > 256 || len(v) > 256 {
|
|
return nil, fmt.Errorf("encryption context key or value too long (max 256 characters)")
|
|
}
|
|
}
|
|
|
|
return context, nil
|
|
}
|
|
|
|
// SerializeSSEKMSMetadata serializes SSE-KMS metadata for storage in object metadata
|
|
func SerializeSSEKMSMetadata(sseKey *SSEKMSKey) ([]byte, error) {
|
|
if err := ValidateSSEKMSKey(sseKey); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
metadata := &SSEKMSMetadata{
|
|
Algorithm: s3_constants.SSEAlgorithmKMS,
|
|
KeyID: sseKey.KeyID,
|
|
EncryptedDataKey: base64.StdEncoding.EncodeToString(sseKey.EncryptedDataKey),
|
|
EncryptionContext: sseKey.EncryptionContext,
|
|
BucketKeyEnabled: sseKey.BucketKeyEnabled,
|
|
IV: base64.StdEncoding.EncodeToString(sseKey.IV), // Store IV for decryption
|
|
PartOffset: sseKey.ChunkOffset, // Store within-part offset
|
|
}
|
|
|
|
data, err := json.Marshal(metadata)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to marshal SSE-KMS metadata: %w", err)
|
|
}
|
|
|
|
glog.V(4).Infof("Serialized SSE-KMS metadata: keyID=%s, bucketKey=%t", sseKey.KeyID, sseKey.BucketKeyEnabled)
|
|
return data, nil
|
|
}
|
|
|
|
// DeserializeSSEKMSMetadata deserializes SSE-KMS metadata from storage and reconstructs the SSE-KMS key
|
|
func DeserializeSSEKMSMetadata(data []byte) (*SSEKMSKey, error) {
|
|
if len(data) == 0 {
|
|
return nil, fmt.Errorf("empty SSE-KMS metadata")
|
|
}
|
|
|
|
var metadata SSEKMSMetadata
|
|
if err := json.Unmarshal(data, &metadata); err != nil {
|
|
return nil, fmt.Errorf("failed to unmarshal SSE-KMS metadata: %w", err)
|
|
}
|
|
|
|
// Validate algorithm - be lenient with missing/empty algorithm for backward compatibility
|
|
if metadata.Algorithm != "" && metadata.Algorithm != s3_constants.SSEAlgorithmKMS {
|
|
return nil, fmt.Errorf("invalid SSE-KMS algorithm: %s", metadata.Algorithm)
|
|
}
|
|
|
|
// Set default algorithm if empty
|
|
if metadata.Algorithm == "" {
|
|
metadata.Algorithm = s3_constants.SSEAlgorithmKMS
|
|
}
|
|
|
|
// Decode the encrypted data key
|
|
encryptedDataKey, err := base64.StdEncoding.DecodeString(metadata.EncryptedDataKey)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to decode encrypted data key: %w", err)
|
|
}
|
|
|
|
// Decode the IV
|
|
var iv []byte
|
|
if metadata.IV != "" {
|
|
iv, err = base64.StdEncoding.DecodeString(metadata.IV)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to decode IV: %w", err)
|
|
}
|
|
}
|
|
|
|
sseKey := &SSEKMSKey{
|
|
KeyID: metadata.KeyID,
|
|
EncryptedDataKey: encryptedDataKey,
|
|
EncryptionContext: metadata.EncryptionContext,
|
|
BucketKeyEnabled: metadata.BucketKeyEnabled,
|
|
IV: iv, // Restore IV for decryption
|
|
ChunkOffset: metadata.PartOffset, // Use stored within-part offset
|
|
}
|
|
|
|
glog.V(4).Infof("Deserialized SSE-KMS metadata: keyID=%s, bucketKey=%t", sseKey.KeyID, sseKey.BucketKeyEnabled)
|
|
return sseKey, nil
|
|
}
|
|
|
|
// SSECMetadata represents SSE-C metadata for per-chunk storage (unified with SSE-KMS approach)
|
|
type SSECMetadata struct {
|
|
Algorithm string `json:"algorithm"` // SSE-C algorithm (always "AES256")
|
|
IV string `json:"iv"` // Base64-encoded initialization vector for this chunk
|
|
KeyMD5 string `json:"keyMD5"` // MD5 of the customer-provided key
|
|
PartOffset int64 `json:"partOffset"` // Offset within original multipart part (for IV calculation)
|
|
}
|
|
|
|
// SerializeSSECMetadata serializes SSE-C metadata for storage in chunk metadata
|
|
func SerializeSSECMetadata(iv []byte, keyMD5 string, partOffset int64) ([]byte, error) {
|
|
if err := ValidateIV(iv, "IV"); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
metadata := &SSECMetadata{
|
|
Algorithm: s3_constants.SSEAlgorithmAES256,
|
|
IV: base64.StdEncoding.EncodeToString(iv),
|
|
KeyMD5: keyMD5,
|
|
PartOffset: partOffset,
|
|
}
|
|
|
|
data, err := json.Marshal(metadata)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to marshal SSE-C metadata: %w", err)
|
|
}
|
|
|
|
glog.V(4).Infof("Serialized SSE-C metadata: keyMD5=%s, partOffset=%d", keyMD5, partOffset)
|
|
return data, nil
|
|
}
|
|
|
|
// DeserializeSSECMetadata deserializes SSE-C metadata from chunk storage
|
|
func DeserializeSSECMetadata(data []byte) (*SSECMetadata, error) {
|
|
if len(data) == 0 {
|
|
return nil, fmt.Errorf("empty SSE-C metadata")
|
|
}
|
|
|
|
var metadata SSECMetadata
|
|
if err := json.Unmarshal(data, &metadata); err != nil {
|
|
return nil, fmt.Errorf("failed to unmarshal SSE-C metadata: %w", err)
|
|
}
|
|
|
|
// Validate algorithm
|
|
if metadata.Algorithm != s3_constants.SSEAlgorithmAES256 {
|
|
return nil, fmt.Errorf("invalid SSE-C algorithm: %s", metadata.Algorithm)
|
|
}
|
|
|
|
// Validate IV
|
|
if metadata.IV == "" {
|
|
return nil, fmt.Errorf("missing IV in SSE-C metadata")
|
|
}
|
|
|
|
if _, err := base64.StdEncoding.DecodeString(metadata.IV); err != nil {
|
|
return nil, fmt.Errorf("invalid base64 IV in SSE-C metadata: %w", err)
|
|
}
|
|
|
|
glog.V(4).Infof("Deserialized SSE-C metadata: keyMD5=%s, partOffset=%d", metadata.KeyMD5, metadata.PartOffset)
|
|
return &metadata, nil
|
|
}
|
|
|
|
// AddSSEKMSResponseHeaders adds SSE-KMS response headers to an HTTP response
|
|
func AddSSEKMSResponseHeaders(w http.ResponseWriter, sseKey *SSEKMSKey) {
|
|
w.Header().Set(s3_constants.AmzServerSideEncryption, s3_constants.SSEAlgorithmKMS)
|
|
w.Header().Set(s3_constants.AmzServerSideEncryptionAwsKmsKeyId, sseKey.KeyID)
|
|
|
|
if len(sseKey.EncryptionContext) > 0 {
|
|
// Encode encryption context as base64 JSON
|
|
contextBytes, err := json.Marshal(sseKey.EncryptionContext)
|
|
if err == nil {
|
|
contextB64 := base64.StdEncoding.EncodeToString(contextBytes)
|
|
w.Header().Set(s3_constants.AmzServerSideEncryptionContext, contextB64)
|
|
} else {
|
|
glog.Errorf("Failed to encode encryption context: %v", err)
|
|
}
|
|
}
|
|
|
|
if sseKey.BucketKeyEnabled {
|
|
w.Header().Set(s3_constants.AmzServerSideEncryptionBucketKeyEnabled, "true")
|
|
}
|
|
}
|
|
|
|
// IsSSEKMSRequest checks if the request contains SSE-KMS headers
|
|
func IsSSEKMSRequest(r *http.Request) bool {
|
|
// If SSE-C headers are present, this is not an SSE-KMS request (they are mutually exclusive)
|
|
if r.Header.Get(s3_constants.AmzServerSideEncryptionCustomerAlgorithm) != "" {
|
|
return false
|
|
}
|
|
|
|
// According to AWS S3 specification, SSE-KMS is only valid when the encryption header
|
|
// is explicitly set to "aws:kms". The KMS key ID header alone is not sufficient.
|
|
sseAlgorithm := r.Header.Get(s3_constants.AmzServerSideEncryption)
|
|
return sseAlgorithm == s3_constants.SSEAlgorithmKMS
|
|
}
|
|
|
|
// IsSSEKMSEncrypted checks if the metadata indicates SSE-KMS encryption
|
|
func IsSSEKMSEncrypted(metadata map[string][]byte) bool {
|
|
if metadata == nil {
|
|
return false
|
|
}
|
|
|
|
// The canonical way to identify an SSE-KMS encrypted object is by this header.
|
|
if sseAlgorithm, exists := metadata[s3_constants.AmzServerSideEncryption]; exists {
|
|
return string(sseAlgorithm) == s3_constants.SSEAlgorithmKMS
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// MapKMSErrorToS3Error maps KMS errors to appropriate S3 error codes
|
|
func MapKMSErrorToS3Error(err error) s3err.ErrorCode {
|
|
if err == nil {
|
|
return s3err.ErrNone
|
|
}
|
|
|
|
// Check if it's a KMS error
|
|
kmsErr, ok := err.(*kms.KMSError)
|
|
if !ok {
|
|
return s3err.ErrInternalError
|
|
}
|
|
|
|
switch kmsErr.Code {
|
|
case kms.ErrCodeNotFoundException:
|
|
return s3err.ErrKMSKeyNotFound
|
|
case kms.ErrCodeAccessDenied:
|
|
return s3err.ErrKMSAccessDenied
|
|
case kms.ErrCodeKeyUnavailable:
|
|
return s3err.ErrKMSDisabled
|
|
case kms.ErrCodeInvalidKeyUsage:
|
|
return s3err.ErrKMSAccessDenied
|
|
case kms.ErrCodeInvalidCiphertext:
|
|
return s3err.ErrKMSInvalidCiphertext
|
|
default:
|
|
glog.Errorf("Unmapped KMS error: %s - %s", kmsErr.Code, kmsErr.Message)
|
|
return s3err.ErrInternalError
|
|
}
|
|
}
|
|
|
|
// SSEKMSCopyStrategy represents different strategies for copying SSE-KMS encrypted objects
|
|
type SSEKMSCopyStrategy int
|
|
|
|
const (
|
|
// SSEKMSCopyStrategyDirect - Direct chunk copy (same key, no re-encryption needed)
|
|
SSEKMSCopyStrategyDirect SSEKMSCopyStrategy = iota
|
|
// SSEKMSCopyStrategyDecryptEncrypt - Decrypt source and re-encrypt for destination
|
|
SSEKMSCopyStrategyDecryptEncrypt
|
|
)
|
|
|
|
// String returns string representation of the strategy
|
|
func (s SSEKMSCopyStrategy) String() string {
|
|
switch s {
|
|
case SSEKMSCopyStrategyDirect:
|
|
return "Direct"
|
|
case SSEKMSCopyStrategyDecryptEncrypt:
|
|
return "DecryptEncrypt"
|
|
default:
|
|
return "Unknown"
|
|
}
|
|
}
|
|
|
|
// GetSourceSSEKMSInfo extracts SSE-KMS information from source object metadata
|
|
func GetSourceSSEKMSInfo(metadata map[string][]byte) (keyID string, isEncrypted bool) {
|
|
if sseAlgorithm, exists := metadata[s3_constants.AmzServerSideEncryption]; exists && string(sseAlgorithm) == s3_constants.SSEAlgorithmKMS {
|
|
if kmsKeyID, exists := metadata[s3_constants.AmzServerSideEncryptionAwsKmsKeyId]; exists {
|
|
return string(kmsKeyID), true
|
|
}
|
|
return "", true // SSE-KMS with default key
|
|
}
|
|
return "", false
|
|
}
|
|
|
|
// CanDirectCopySSEKMS determines if we can directly copy chunks without decrypt/re-encrypt
|
|
func CanDirectCopySSEKMS(srcMetadata map[string][]byte, destKeyID string) bool {
|
|
srcKeyID, srcEncrypted := GetSourceSSEKMSInfo(srcMetadata)
|
|
|
|
// Case 1: Source unencrypted, destination unencrypted -> Direct copy
|
|
if !srcEncrypted && destKeyID == "" {
|
|
return true
|
|
}
|
|
|
|
// Case 2: Source encrypted with same KMS key as destination -> Direct copy
|
|
if srcEncrypted && destKeyID != "" {
|
|
// Same key if key IDs match (empty means default key)
|
|
return srcKeyID == destKeyID
|
|
}
|
|
|
|
// All other cases require decrypt/re-encrypt
|
|
return false
|
|
}
|
|
|
|
// DetermineSSEKMSCopyStrategy determines the optimal copy strategy for SSE-KMS
|
|
func DetermineSSEKMSCopyStrategy(srcMetadata map[string][]byte, destKeyID string) (SSEKMSCopyStrategy, error) {
|
|
if CanDirectCopySSEKMS(srcMetadata, destKeyID) {
|
|
return SSEKMSCopyStrategyDirect, nil
|
|
}
|
|
return SSEKMSCopyStrategyDecryptEncrypt, nil
|
|
}
|
|
|
|
// ParseSSEKMSCopyHeaders parses SSE-KMS headers from copy request
|
|
func ParseSSEKMSCopyHeaders(r *http.Request) (destKeyID string, encryptionContext map[string]string, bucketKeyEnabled bool, err error) {
|
|
// Check if this is an SSE-KMS request
|
|
if !IsSSEKMSRequest(r) {
|
|
return "", nil, false, nil
|
|
}
|
|
|
|
// Get destination KMS key ID
|
|
destKeyID = r.Header.Get(s3_constants.AmzServerSideEncryptionAwsKmsKeyId)
|
|
|
|
// Validate key ID if provided
|
|
if destKeyID != "" && !isValidKMSKeyID(destKeyID) {
|
|
return "", nil, false, fmt.Errorf("invalid KMS key ID: %s", destKeyID)
|
|
}
|
|
|
|
// Parse encryption context if provided
|
|
if contextHeader := r.Header.Get(s3_constants.AmzServerSideEncryptionContext); contextHeader != "" {
|
|
contextBytes, decodeErr := base64.StdEncoding.DecodeString(contextHeader)
|
|
if decodeErr != nil {
|
|
return "", nil, false, fmt.Errorf("invalid encryption context encoding: %v", decodeErr)
|
|
}
|
|
|
|
if unmarshalErr := json.Unmarshal(contextBytes, &encryptionContext); unmarshalErr != nil {
|
|
return "", nil, false, fmt.Errorf("invalid encryption context JSON: %v", unmarshalErr)
|
|
}
|
|
}
|
|
|
|
// Parse bucket key enabled flag
|
|
if bucketKeyHeader := r.Header.Get(s3_constants.AmzServerSideEncryptionBucketKeyEnabled); bucketKeyHeader != "" {
|
|
bucketKeyEnabled = strings.ToLower(bucketKeyHeader) == "true"
|
|
}
|
|
|
|
return destKeyID, encryptionContext, bucketKeyEnabled, nil
|
|
}
|
|
|
|
// UnifiedCopyStrategy represents all possible copy strategies across encryption types
|
|
type UnifiedCopyStrategy int
|
|
|
|
const (
|
|
// CopyStrategyDirect - Direct chunk copy (no encryption changes)
|
|
CopyStrategyDirect UnifiedCopyStrategy = iota
|
|
// CopyStrategyEncrypt - Encrypt during copy (plain → encrypted)
|
|
CopyStrategyEncrypt
|
|
// CopyStrategyDecrypt - Decrypt during copy (encrypted → plain)
|
|
CopyStrategyDecrypt
|
|
// CopyStrategyReencrypt - Decrypt and re-encrypt (different keys/methods)
|
|
CopyStrategyReencrypt
|
|
// CopyStrategyKeyRotation - Same object, different key (metadata-only update)
|
|
CopyStrategyKeyRotation
|
|
)
|
|
|
|
// String returns string representation of the unified strategy
|
|
func (s UnifiedCopyStrategy) String() string {
|
|
switch s {
|
|
case CopyStrategyDirect:
|
|
return "Direct"
|
|
case CopyStrategyEncrypt:
|
|
return "Encrypt"
|
|
case CopyStrategyDecrypt:
|
|
return "Decrypt"
|
|
case CopyStrategyReencrypt:
|
|
return "Reencrypt"
|
|
case CopyStrategyKeyRotation:
|
|
return "KeyRotation"
|
|
default:
|
|
return "Unknown"
|
|
}
|
|
}
|
|
|
|
// EncryptionState represents the encryption state of source and destination
|
|
type EncryptionState struct {
|
|
SrcSSEC bool
|
|
SrcSSEKMS bool
|
|
SrcSSES3 bool
|
|
DstSSEC bool
|
|
DstSSEKMS bool
|
|
DstSSES3 bool
|
|
SameObject bool
|
|
}
|
|
|
|
// IsSourceEncrypted returns true if source has any encryption
|
|
func (e *EncryptionState) IsSourceEncrypted() bool {
|
|
return e.SrcSSEC || e.SrcSSEKMS || e.SrcSSES3
|
|
}
|
|
|
|
// IsTargetEncrypted returns true if target should be encrypted
|
|
func (e *EncryptionState) IsTargetEncrypted() bool {
|
|
return e.DstSSEC || e.DstSSEKMS || e.DstSSES3
|
|
}
|
|
|
|
// DetermineUnifiedCopyStrategy determines the optimal copy strategy for all encryption types
|
|
func DetermineUnifiedCopyStrategy(state *EncryptionState, srcMetadata map[string][]byte, r *http.Request) (UnifiedCopyStrategy, error) {
|
|
// Key rotation: same object with different encryption
|
|
if state.SameObject && state.IsSourceEncrypted() && state.IsTargetEncrypted() {
|
|
// Check if it's actually a key change
|
|
if state.SrcSSEC && state.DstSSEC {
|
|
// SSE-C key rotation - need to compare keys
|
|
return CopyStrategyKeyRotation, nil
|
|
}
|
|
if state.SrcSSEKMS && state.DstSSEKMS {
|
|
// SSE-KMS key rotation - need to compare key IDs
|
|
srcKeyID, _ := GetSourceSSEKMSInfo(srcMetadata)
|
|
dstKeyID := r.Header.Get(s3_constants.AmzServerSideEncryptionAwsKmsKeyId)
|
|
if srcKeyID != dstKeyID {
|
|
return CopyStrategyKeyRotation, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// Direct copy: no encryption changes
|
|
if !state.IsSourceEncrypted() && !state.IsTargetEncrypted() {
|
|
return CopyStrategyDirect, nil
|
|
}
|
|
|
|
// Same encryption type and key
|
|
if state.SrcSSEKMS && state.DstSSEKMS {
|
|
srcKeyID, _ := GetSourceSSEKMSInfo(srcMetadata)
|
|
dstKeyID := r.Header.Get(s3_constants.AmzServerSideEncryptionAwsKmsKeyId)
|
|
if srcKeyID == dstKeyID {
|
|
return CopyStrategyDirect, nil
|
|
}
|
|
}
|
|
|
|
if state.SrcSSEC && state.DstSSEC {
|
|
// For SSE-C, we'd need to compare the actual keys, but we can't do that securely
|
|
// So we assume different keys and use reencrypt strategy
|
|
return CopyStrategyReencrypt, nil
|
|
}
|
|
|
|
// Encrypt: plain → encrypted
|
|
if !state.IsSourceEncrypted() && state.IsTargetEncrypted() {
|
|
return CopyStrategyEncrypt, nil
|
|
}
|
|
|
|
// Decrypt: encrypted → plain
|
|
if state.IsSourceEncrypted() && !state.IsTargetEncrypted() {
|
|
return CopyStrategyDecrypt, nil
|
|
}
|
|
|
|
// Reencrypt: different encryption types or keys
|
|
if state.IsSourceEncrypted() && state.IsTargetEncrypted() {
|
|
return CopyStrategyReencrypt, nil
|
|
}
|
|
|
|
return CopyStrategyDirect, nil
|
|
}
|
|
|
|
// DetectEncryptionStateWithEntry analyzes the source entry and request headers to determine encryption state
|
|
// This version can detect multipart encrypted objects by examining chunks
|
|
func DetectEncryptionStateWithEntry(entry *filer_pb.Entry, r *http.Request, srcPath, dstPath string) *EncryptionState {
|
|
state := &EncryptionState{
|
|
SrcSSEC: IsSSECEncryptedWithEntry(entry),
|
|
SrcSSEKMS: IsSSEKMSEncryptedWithEntry(entry),
|
|
SrcSSES3: IsSSES3EncryptedInternal(entry.Extended),
|
|
DstSSEC: IsSSECRequest(r),
|
|
DstSSEKMS: IsSSEKMSRequest(r),
|
|
DstSSES3: IsSSES3RequestInternal(r),
|
|
SameObject: srcPath == dstPath,
|
|
}
|
|
|
|
return state
|
|
}
|
|
|
|
// IsSSEKMSEncryptedWithEntry detects SSE-KMS encryption from entry (including multipart objects)
|
|
func IsSSEKMSEncryptedWithEntry(entry *filer_pb.Entry) bool {
|
|
if entry == nil {
|
|
return false
|
|
}
|
|
|
|
// Check object-level metadata first
|
|
if IsSSEKMSEncrypted(entry.Extended) {
|
|
return true
|
|
}
|
|
|
|
// Check for multipart SSE-KMS by examining chunks
|
|
if len(entry.GetChunks()) > 0 {
|
|
for _, chunk := range entry.GetChunks() {
|
|
if chunk.GetSseType() == filer_pb.SSEType_SSE_KMS {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// IsSSECEncryptedWithEntry detects SSE-C encryption from entry (including multipart objects)
|
|
func IsSSECEncryptedWithEntry(entry *filer_pb.Entry) bool {
|
|
if entry == nil {
|
|
return false
|
|
}
|
|
|
|
// Check object-level metadata first
|
|
if IsSSECEncrypted(entry.Extended) {
|
|
return true
|
|
}
|
|
|
|
// Check for multipart SSE-C by examining chunks
|
|
if len(entry.GetChunks()) > 0 {
|
|
for _, chunk := range entry.GetChunks() {
|
|
if chunk.GetSseType() == filer_pb.SSEType_SSE_C {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// Helper functions for SSE-C detection are in s3_sse_c.go
|