Files
seaweedFS/weed/replication/sink/azuresink/azure_sink.go
Mmx233 3cea900241 fix: replication sinks upload ciphertext for SSE-encrypted objects (#8931)
* fix: decrypt SSE-encrypted objects in S3 replication sink

* fix: add SSE decryption support to GCS, Azure, B2, Local sinks

* fix: return error instead of warning for SSE-C objects during replication

* fix: close readers after upload to prevent resource leaks

* fix: return error for unknown SSE types instead of passing through ciphertext

* refactor(repl_util): extract CloseReader/CloseMaybeDecryptedReader helpers

The io.Closer close-on-error and defer-close pattern was duplicated in
copyWithDecryption and the S3 sink. Extract exported helpers to keep a
single implementation and prevent future divergence.

* fix(repl_util): warn on mixed SSE types across chunks in detectSSEType

detectSSEType previously returned the SSE type of the first encrypted
chunk without inspecting the rest. If an entry somehow has chunks with
different SSE types, only the first type's decryption would be applied.
Now scans all chunks and logs a warning on mismatch.

* fix(repl_util): decrypt inline SSE objects during replication

Small SSE-encrypted objects stored in entry.Content were being copied
as ciphertext because:
1. detectSSEType only checked chunk metadata, but inline objects have
   no chunks — now falls back to checking entry.Extended for SSE keys
2. Non-S3 sinks short-circuited on len(entry.Content)>0, bypassing
   the decryption path — now call MaybeDecryptContent before writing

Adds MaybeDecryptContent helper for decrypting inline byte content.

* fix(repl_util): add KMS initialization for replication SSE decryption

SSE-KMS decryption was not wired up for filer.backup — the only
initialization was for SSE-S3 key manager. CreateSSEKMSDecryptedReader
requires a global KMS provider which is only loaded by the S3 API
auth-config path.

Add InitializeSSEForReplication helper that initializes both SSE-S3
(from filer KEK) and SSE-KMS (from Viper config [kms] section /
WEED_KMS_* env vars). Replace the SSE-S3-only init in filer_backup.go.

* fix(replicator): initialize SSE decryption for filer.replicate

The SSE decryption setup was only added to filer_backup.go, but the
notification-based replicator (filer.replicate) uses the same sinks
and was missing the required initialization. Add SSE init in
NewReplicator so filer.replicate can decrypt SSE objects.

* refactor(repl_util): fold entry param into CopyFromChunkViews

Remove the CopyFromChunkViewsWithEntry wrapper and add the entry
parameter directly to CopyFromChunkViews, since all callers already
pass it.

* fix(repl_util): guard SSE init with sync.Once, error on mixed SSE types

InitializeWithFiler overwrites the global superKey on every call.
Wrap InitializeSSEForReplication with sync.Once so repeated calls
(e.g. from NewReplicator) are safe.

detectSSEType now returns an error instead of logging a warning when
chunks have inconsistent SSE types, so replication aborts rather than
silently applying the wrong decryption to some chunks.

* fix(repl_util): allow SSE init retry, detect conflicting metadata, add tests

- Replace sync.Once with mutex+bool so transient failures (e.g. filer
  unreachable) don't permanently prevent initialization. Only successful
  init flips the flag; failed attempts allow retries.

- Remove v.IsSet("kms") guard that prevented env-only KMS configs
  (WEED_KMS_*) from being detected. Always attempt KMS loading and let
  LoadConfigurations handle "no config found".

- detectSSEType now checks for conflicting extended metadata keys
  (e.g. both SeaweedFSSSES3Key and SeaweedFSSSEKMSKey present) and
  returns an error instead of silently picking the first match.

- Add table-driven tests for detectSSEType, MaybeDecryptReader, and
  MaybeDecryptContent covering plaintext, uniform SSE, mixed chunks,
  inline SSE via extended metadata, conflicting metadata, and SSE-C.

* test(repl_util): add SSE-S3 and SSE-KMS integration tests

Add round-trip encryption/decryption tests:
- SSE-S3: encrypt with CreateSSES3EncryptedReader, decrypt with
  CreateSSES3DecryptedReader, verify plaintext matches
- SSE-KMS: encrypt with AES-CTR, wire a mock KMSProvider via
  SetGlobalKMSProvider, build serialized KMS metadata, verify
  MaybeDecryptReader and MaybeDecryptContent produce correct plaintext

Fix existing tests to check io.ReadAll errors.

* test(repl_util): exercise full SSE-S3 path through MaybeDecryptReader

Replace direct CreateSSES3DecryptedReader calls with end-to-end tests
that go through MaybeDecryptReader → decryptSSES3 →
DeserializeSSES3Metadata → GetSSES3IV → CreateSSES3DecryptedReader.

Uses WEED_S3_SSE_KEK env var + a mock filer client to initialize the
global key manager with a test KEK, then SerializeSSES3Metadata to
build proper envelope-encrypted metadata. Cleanup restores the key
manager state.

* fix(localsink): write to temp file to prevent truncated replicas

The local sink truncated the destination file before writing content.
If decryption or chunk copy failed, the file was left empty/truncated,
destroying the previous replica.

Write to a temp file in the same directory and atomically rename on
success. On any error the temp file is cleaned up and the existing
replica is untouched.

---------

Co-authored-by: Chris Lu <chris.lu@gmail.com>
2026-04-06 00:32:27 -07:00

291 lines
10 KiB
Go

package azuresink
import (
"bytes"
"context"
"fmt"
"strings"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/appendblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/remote_storage/azure"
"github.com/seaweedfs/seaweedfs/weed/replication/repl_util"
"github.com/seaweedfs/seaweedfs/weed/replication/sink"
"github.com/seaweedfs/seaweedfs/weed/replication/source"
"github.com/seaweedfs/seaweedfs/weed/util"
)
type AzureSink struct {
client *azblob.Client
container string
dir string
filerSource *source.FilerSource
isIncremental bool
}
func init() {
sink.Sinks = append(sink.Sinks, &AzureSink{})
}
func (g *AzureSink) GetName() string {
return "azure"
}
func (g *AzureSink) GetSinkToDirectory() string {
return g.dir
}
func (g *AzureSink) IsIncremental() bool {
return g.isIncremental
}
func (g *AzureSink) Initialize(configuration util.Configuration, prefix string) error {
g.isIncremental = configuration.GetBool(prefix + "is_incremental")
return g.initialize(
configuration.GetString(prefix+"account_name"),
configuration.GetString(prefix+"account_key"),
configuration.GetString(prefix+"container"),
configuration.GetString(prefix+"directory"),
)
}
func (g *AzureSink) SetSourceFiler(s *source.FilerSource) {
g.filerSource = s
}
func (g *AzureSink) initialize(accountName, accountKey, container, dir string) error {
g.container = container
g.dir = dir
// Create credential and client
credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
return fmt.Errorf("failed to create Azure credential with account name:%s: %w", accountName, err)
}
serviceURL := fmt.Sprintf("https://%s.blob.core.windows.net/", accountName)
client, err := azblob.NewClientWithSharedKeyCredential(serviceURL, credential, azure.DefaultAzBlobClientOptions())
if err != nil {
return fmt.Errorf("failed to create Azure client: %w", err)
}
g.client = client
// Validate that the container exists early to catch configuration errors
containerClient := client.ServiceClient().NewContainerClient(container)
ctxValidate, cancelValidate := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout)
defer cancelValidate()
_, err = containerClient.GetProperties(ctxValidate, nil)
if err != nil {
if bloberror.HasCode(err, bloberror.ContainerNotFound) {
return fmt.Errorf("Azure container '%s' does not exist. Please create it first", container)
}
return fmt.Errorf("failed to validate Azure container '%s': %w", container, err)
}
return nil
}
func (g *AzureSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {
key = cleanKey(key)
if isDirectory {
key = key + "/"
}
blobClient := g.client.ServiceClient().NewContainerClient(g.container).NewBlobClient(key)
ctxDelete, cancelDelete := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout)
defer cancelDelete()
_, err := blobClient.Delete(ctxDelete, &blob.DeleteOptions{
DeleteSnapshots: to.Ptr(blob.DeleteSnapshotsOptionTypeInclude),
})
if err != nil {
// Make delete idempotent - don't return error if blob doesn't exist
if bloberror.HasCode(err, bloberror.BlobNotFound) {
return nil
}
return fmt.Errorf("azure delete %s/%s: %w", g.container, key, err)
}
return nil
}
func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error {
key = cleanKey(key)
if entry.IsDirectory {
return nil
}
totalSize := filer.FileSize(entry)
chunkViews := filer.ViewFromChunks(context.Background(), g.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize))
// Create append blob client
appendBlobClient := g.client.ServiceClient().NewContainerClient(g.container).NewAppendBlobClient(key)
// Try to create the blob first (without access conditions for initial creation)
ctxCreate, cancelCreate := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout)
defer cancelCreate()
_, err := appendBlobClient.Create(ctxCreate, nil)
needsWrite := true
freshlyCreated := false
if err != nil {
if bloberror.HasCode(err, bloberror.BlobAlreadyExists) {
// Handle existing blob - check if overwrite is needed and perform it if necessary
var handleErr error
needsWrite, handleErr = g.handleExistingBlob(appendBlobClient, key, entry, totalSize)
if handleErr != nil {
return handleErr
}
// handleExistingBlob recreates the blob when needsWrite is true
freshlyCreated = needsWrite
} else {
return fmt.Errorf("azure create append blob %s/%s: %w", g.container, key, err)
}
} else {
freshlyCreated = true
}
// If we don't need to write (blob is up-to-date), return early
if !needsWrite {
return nil
}
// cleanupOnError deletes a freshly created blob when content write fails,
// preventing empty blobs from being left behind.
cleanupOnError := func(writeErr error) error {
if !freshlyCreated {
return writeErr
}
glog.Warningf("azure sink: cleaning up empty blob %s/%s after write failure: %v", g.container, key, writeErr)
ctxCleanup, cancelCleanup := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout)
defer cancelCleanup()
if _, delErr := appendBlobClient.Delete(ctxCleanup, nil); delErr != nil {
if !bloberror.HasCode(delErr, bloberror.BlobNotFound) {
glog.Warningf("azure sink: failed to clean up blob %s/%s: %v", g.container, key, delErr)
}
}
return writeErr
}
writeFunc := func(data []byte) error {
ctxWrite, cancelWrite := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout)
defer cancelWrite()
_, writeErr := appendBlobClient.AppendBlock(ctxWrite, streaming.NopCloser(bytes.NewReader(data)), &appendblob.AppendBlockOptions{})
return writeErr
}
if len(entry.Content) > 0 {
content, err := repl_util.MaybeDecryptContent(entry.Content, entry)
if err != nil {
return cleanupOnError(fmt.Errorf("decrypt inline SSE content: %w", err))
}
if err := writeFunc(content); err != nil {
return cleanupOnError(err)
}
return nil
}
if err := repl_util.CopyFromChunkViews(chunkViews, g.filerSource, writeFunc, entry); err != nil {
return cleanupOnError(err)
}
return nil
}
// handleExistingBlob determines whether an existing blob needs to be overwritten and performs the overwrite if necessary.
// It returns:
// - needsWrite: true if the caller should write data to the blob, false if the blob is already up-to-date
// - error: any error encountered during the operation
func (g *AzureSink) handleExistingBlob(appendBlobClient *appendblob.Client, key string, entry *filer_pb.Entry, totalSize uint64) (needsWrite bool, err error) {
// Get the blob's properties to decide whether to overwrite.
// Use a timeout to fail fast on network issues.
ctxProps, cancelProps := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout)
defer cancelProps()
props, propErr := appendBlobClient.GetProperties(ctxProps, nil)
// Fail fast if we cannot fetch properties - we should not proceed to delete without knowing the blob state.
if propErr != nil {
return false, fmt.Errorf("azure get properties %s/%s: %w", g.container, key, propErr)
}
// Check if we can skip writing based on modification time and size.
if entry.Attributes != nil && entry.Attributes.Mtime > 0 && props.LastModified != nil && props.ContentLength != nil {
const clockSkewTolerance = int64(2) // seconds - allow small clock differences
remoteMtime := props.LastModified.Unix()
localMtime := entry.Attributes.Mtime
// Skip if remote is newer/same (within skew tolerance) and has the SAME size.
// This prevents skipping partial/corrupted files that may have a newer mtime.
if remoteMtime >= localMtime-clockSkewTolerance && *props.ContentLength == int64(totalSize) {
glog.V(2).Infof("skip overwriting %s/%s: remote is up-to-date (remote mtime: %d >= local mtime: %d, size: %d)",
g.container, key, remoteMtime, localMtime, *props.ContentLength)
return false, nil
}
}
// Blob is empty or outdated - we need to delete and recreate it.
// REQUIRE ETag for conditional delete to avoid race conditions and data loss.
if props.ETag == nil {
return false, fmt.Errorf("azure blob %s/%s: missing ETag; refusing to delete without conditional", g.container, key)
}
deleteOpts := &blob.DeleteOptions{
DeleteSnapshots: to.Ptr(blob.DeleteSnapshotsOptionTypeInclude),
AccessConditions: &blob.AccessConditions{
ModifiedAccessConditions: &blob.ModifiedAccessConditions{
IfMatch: props.ETag,
},
},
}
// Delete existing blob with conditional delete and timeout.
ctxDel, cancelDel := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout)
defer cancelDel()
_, delErr := appendBlobClient.Delete(ctxDel, deleteOpts)
if delErr != nil {
// If the precondition fails, the blob was modified by another process after we checked it.
// Failing here is safe; replication will retry.
if bloberror.HasCode(delErr, bloberror.ConditionNotMet) {
return false, fmt.Errorf("azure blob %s/%s was modified concurrently, preventing overwrite: %w", g.container, key, delErr)
}
// Ignore BlobNotFound, as the goal is to delete it anyway.
if !bloberror.HasCode(delErr, bloberror.BlobNotFound) {
return false, fmt.Errorf("azure delete existing blob %s/%s: %w", g.container, key, delErr)
}
}
// Recreate the blob with timeout.
ctxRecreate, cancelRecreate := context.WithTimeout(context.Background(), azure.DefaultAzureOpTimeout)
defer cancelRecreate()
_, createErr := appendBlobClient.Create(ctxRecreate, nil)
if createErr != nil {
// It's possible another process recreated it after our delete.
// Failing is safe, as a retry of the whole function will handle it.
return false, fmt.Errorf("azure recreate append blob %s/%s: %w", g.container, key, createErr)
}
return true, nil
}
func (g *AzureSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) {
key = cleanKey(key)
return true, g.CreateEntry(key, newEntry, signatures)
}
func cleanKey(key string) string {
// Remove all leading slashes (TrimLeft handles multiple slashes, unlike TrimPrefix)
return strings.TrimLeft(key, "/")
}