* fix: decrypt SSE-encrypted objects in S3 replication sink
* fix: add SSE decryption support to GCS, Azure, B2, Local sinks
* fix: return error instead of warning for SSE-C objects during replication
* fix: close readers after upload to prevent resource leaks
* fix: return error for unknown SSE types instead of passing through ciphertext
* refactor(repl_util): extract CloseReader/CloseMaybeDecryptedReader helpers
The io.Closer close-on-error and defer-close pattern was duplicated in
copyWithDecryption and the S3 sink. Extract exported helpers to keep a
single implementation and prevent future divergence.
* fix(repl_util): warn on mixed SSE types across chunks in detectSSEType
detectSSEType previously returned the SSE type of the first encrypted
chunk without inspecting the rest. If an entry somehow has chunks with
different SSE types, only the first type's decryption would be applied.
Now scans all chunks and logs a warning on mismatch.
* fix(repl_util): decrypt inline SSE objects during replication
Small SSE-encrypted objects stored in entry.Content were being copied
as ciphertext because:
1. detectSSEType only checked chunk metadata, but inline objects have
no chunks — now falls back to checking entry.Extended for SSE keys
2. Non-S3 sinks short-circuited on len(entry.Content)>0, bypassing
the decryption path — now call MaybeDecryptContent before writing
Adds MaybeDecryptContent helper for decrypting inline byte content.
* fix(repl_util): add KMS initialization for replication SSE decryption
SSE-KMS decryption was not wired up for filer.backup — the only
initialization was for SSE-S3 key manager. CreateSSEKMSDecryptedReader
requires a global KMS provider which is only loaded by the S3 API
auth-config path.
Add InitializeSSEForReplication helper that initializes both SSE-S3
(from filer KEK) and SSE-KMS (from Viper config [kms] section /
WEED_KMS_* env vars). Replace the SSE-S3-only init in filer_backup.go.
* fix(replicator): initialize SSE decryption for filer.replicate
The SSE decryption setup was only added to filer_backup.go, but the
notification-based replicator (filer.replicate) uses the same sinks
and was missing the required initialization. Add SSE init in
NewReplicator so filer.replicate can decrypt SSE objects.
* refactor(repl_util): fold entry param into CopyFromChunkViews
Remove the CopyFromChunkViewsWithEntry wrapper and add the entry
parameter directly to CopyFromChunkViews, since all callers already
pass it.
* fix(repl_util): guard SSE init with sync.Once, error on mixed SSE types
InitializeWithFiler overwrites the global superKey on every call.
Wrap InitializeSSEForReplication with sync.Once so repeated calls
(e.g. from NewReplicator) are safe.
detectSSEType now returns an error instead of logging a warning when
chunks have inconsistent SSE types, so replication aborts rather than
silently applying the wrong decryption to some chunks.
* fix(repl_util): allow SSE init retry, detect conflicting metadata, add tests
- Replace sync.Once with mutex+bool so transient failures (e.g. filer
unreachable) don't permanently prevent initialization. Only successful
init flips the flag; failed attempts allow retries.
- Remove v.IsSet("kms") guard that prevented env-only KMS configs
(WEED_KMS_*) from being detected. Always attempt KMS loading and let
LoadConfigurations handle "no config found".
- detectSSEType now checks for conflicting extended metadata keys
(e.g. both SeaweedFSSSES3Key and SeaweedFSSSEKMSKey present) and
returns an error instead of silently picking the first match.
- Add table-driven tests for detectSSEType, MaybeDecryptReader, and
MaybeDecryptContent covering plaintext, uniform SSE, mixed chunks,
inline SSE via extended metadata, conflicting metadata, and SSE-C.
* test(repl_util): add SSE-S3 and SSE-KMS integration tests
Add round-trip encryption/decryption tests:
- SSE-S3: encrypt with CreateSSES3EncryptedReader, decrypt with
CreateSSES3DecryptedReader, verify plaintext matches
- SSE-KMS: encrypt with AES-CTR, wire a mock KMSProvider via
SetGlobalKMSProvider, build serialized KMS metadata, verify
MaybeDecryptReader and MaybeDecryptContent produce correct plaintext
Fix existing tests to check io.ReadAll errors.
* test(repl_util): exercise full SSE-S3 path through MaybeDecryptReader
Replace direct CreateSSES3DecryptedReader calls with end-to-end tests
that go through MaybeDecryptReader → decryptSSES3 →
DeserializeSSES3Metadata → GetSSES3IV → CreateSSES3DecryptedReader.
Uses WEED_S3_SSE_KEK env var + a mock filer client to initialize the
global key manager with a test KEK, then SerializeSSES3Metadata to
build proper envelope-encrypted metadata. Cleanup restores the key
manager state.
* fix(localsink): write to temp file to prevent truncated replicas
The local sink truncated the destination file before writing content.
If decryption or chunk copy failed, the file was left empty/truncated,
destroying the previous replica.
Write to a temp file in the same directory and atomically rename on
success. On any error the temp file is cleaned up and the existing
replica is untouched.
---------
Co-authored-by: Chris Lu <chris.lu@gmail.com>
244 lines
10 KiB
Go
244 lines
10 KiB
Go
package command
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
nethttp "net/http"
|
|
"regexp"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/replication/repl_util"
|
|
"github.com/seaweedfs/seaweedfs/weed/replication/source"
|
|
"github.com/seaweedfs/seaweedfs/weed/security"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
"github.com/seaweedfs/seaweedfs/weed/util/http"
|
|
"github.com/seaweedfs/seaweedfs/weed/util/wildcard"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
type FilerBackupOptions struct {
|
|
isActivePassive *bool
|
|
filer *string
|
|
path *string
|
|
excludePaths *string
|
|
excludeFileName *string // deprecated: use excludeFileNames
|
|
excludeFileNames *string
|
|
excludePathPatterns *string
|
|
debug *bool
|
|
proxyByFiler *bool
|
|
doDeleteFiles *bool
|
|
disableErrorRetry *bool
|
|
ignore404Error *bool
|
|
timeAgo *time.Duration
|
|
retentionDays *int
|
|
}
|
|
|
|
var (
|
|
filerBackupOptions FilerBackupOptions
|
|
ignorable404ErrString = fmt.Sprintf("%d %s: %s", nethttp.StatusNotFound, nethttp.StatusText(nethttp.StatusNotFound), http.ErrNotFound.Error())
|
|
)
|
|
|
|
func init() {
|
|
cmdFilerBackup.Run = runFilerBackup // break init cycle
|
|
filerBackupOptions.filer = cmdFilerBackup.Flag.String("filer", "localhost:8888", "filer of one SeaweedFS cluster")
|
|
filerBackupOptions.path = cmdFilerBackup.Flag.String("filerPath", "/", "directory to sync on filer")
|
|
filerBackupOptions.excludePaths = cmdFilerBackup.Flag.String("filerExcludePaths", "", "exclude directories to sync on filer")
|
|
filerBackupOptions.excludeFileName = cmdFilerBackup.Flag.String("filerExcludeFileName", "", "[DEPRECATED: use -filerExcludeFileNames] exclude file names that match the regexp")
|
|
filerBackupOptions.excludeFileNames = cmdFilerBackup.Flag.String("filerExcludeFileNames", "", "comma-separated wildcard patterns to exclude file names, e.g., \"*.tmp,._*\"")
|
|
filerBackupOptions.excludePathPatterns = cmdFilerBackup.Flag.String("filerExcludePathPatterns", "", "comma-separated wildcard patterns to exclude paths where any component matches, e.g., \".snapshot,temp*\"")
|
|
filerBackupOptions.proxyByFiler = cmdFilerBackup.Flag.Bool("filerProxy", false, "read and write file chunks by filer instead of volume servers")
|
|
filerBackupOptions.doDeleteFiles = cmdFilerBackup.Flag.Bool("doDeleteFiles", false, "delete files on the destination")
|
|
filerBackupOptions.debug = cmdFilerBackup.Flag.Bool("debug", false, "debug mode to print out received files")
|
|
filerBackupOptions.timeAgo = cmdFilerBackup.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
|
|
filerBackupOptions.retentionDays = cmdFilerBackup.Flag.Int("retentionDays", 0, "incremental backup retention days")
|
|
filerBackupOptions.disableErrorRetry = cmdFilerBackup.Flag.Bool("disableErrorRetry", false, "disables errors retry, only logs will print")
|
|
filerBackupOptions.ignore404Error = cmdFilerBackup.Flag.Bool("ignore404Error", true, "ignore 404 errors from filer")
|
|
}
|
|
|
|
var cmdFilerBackup = &Command{
|
|
UsageLine: "filer.backup -filer=<filerHost>:<filerPort> ",
|
|
Short: "resume-able continuously replicate files from a SeaweedFS cluster to another location defined in replication.toml",
|
|
Long: `resume-able continuously replicate files from a SeaweedFS cluster to another location defined in replication.toml
|
|
|
|
filer.backup listens on filer notifications. If any file is updated, it will fetch the updated content,
|
|
and write to the destination. This is to replace filer.replicate command since additional message queue is not needed.
|
|
|
|
If restarted and "-timeAgo" is not set, the synchronization will resume from the previous checkpoints, persisted every minute.
|
|
A fresh sync will start from the earliest metadata logs. To reset the checkpoints, just set "-timeAgo" to a high value.
|
|
|
|
`,
|
|
}
|
|
|
|
func runFilerBackup(cmd *Command, args []string) bool {
|
|
|
|
util.LoadSecurityConfiguration()
|
|
util.LoadConfiguration("replication", true)
|
|
|
|
// Compile exclude patterns once before the retry loop — these are
|
|
// configuration errors and must not be retried.
|
|
reExcludeFileName, err := compileExcludePattern(*filerBackupOptions.excludeFileName, "exclude file name")
|
|
if err != nil {
|
|
glog.Fatalf("invalid -filerExcludeFileName: %v", err)
|
|
}
|
|
excludeFileNames := wildcard.CompileWildcardMatchers(*filerBackupOptions.excludeFileNames)
|
|
excludePathPatterns := wildcard.CompileWildcardMatchers(*filerBackupOptions.excludePathPatterns)
|
|
|
|
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
|
|
|
|
clientId := util.RandomInt32()
|
|
var clientEpoch int32
|
|
|
|
for {
|
|
clientEpoch++
|
|
err := doFilerBackup(grpcDialOption, &filerBackupOptions, reExcludeFileName, excludeFileNames, excludePathPatterns, clientId, clientEpoch)
|
|
if err != nil {
|
|
glog.Errorf("backup from %s: %v", *filerBackupOptions.filer, err)
|
|
time.Sleep(1747 * time.Millisecond)
|
|
}
|
|
}
|
|
}
|
|
|
|
const (
|
|
BackupKeyPrefix = "backup."
|
|
)
|
|
|
|
func doFilerBackup(grpcDialOption grpc.DialOption, backupOption *FilerBackupOptions, reExcludeFileName *regexp.Regexp, excludeFileNames []*wildcard.WildcardMatcher, excludePathPatterns []*wildcard.WildcardMatcher, clientId int32, clientEpoch int32) error {
|
|
|
|
// find data sink
|
|
dataSink := findSink(util.GetViper())
|
|
if dataSink == nil {
|
|
return fmt.Errorf("no data sink configured in replication.toml")
|
|
}
|
|
|
|
sourceFiler := pb.ServerAddress(*backupOption.filer)
|
|
sourcePath := *backupOption.path
|
|
excludePaths := util.StringSplit(*backupOption.excludePaths, ",")
|
|
timeAgo := *backupOption.timeAgo
|
|
targetPath := dataSink.GetSinkToDirectory()
|
|
debug := *backupOption.debug
|
|
|
|
// get start time for the data sink
|
|
startFrom := time.Unix(0, 0)
|
|
sinkId := util.HashStringToLong(dataSink.GetName() + dataSink.GetSinkToDirectory())
|
|
if timeAgo.Milliseconds() == 0 {
|
|
lastOffsetTsNs, err := getOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId))
|
|
if err != nil {
|
|
glog.V(0).Infof("starting from %v", startFrom)
|
|
} else {
|
|
startFrom = time.Unix(0, lastOffsetTsNs)
|
|
glog.V(0).Infof("resuming from %v", startFrom)
|
|
}
|
|
} else {
|
|
startFrom = time.Now().Add(-timeAgo)
|
|
glog.V(0).Infof("start time is set to %v", startFrom)
|
|
}
|
|
|
|
// create filer sink
|
|
filerSource := &source.FilerSource{}
|
|
filerSource.DoInitialize(
|
|
sourceFiler.ToHttpAddress(),
|
|
sourceFiler.ToGrpcAddress(),
|
|
sourcePath,
|
|
*backupOption.proxyByFiler)
|
|
|
|
if err := repl_util.InitializeSSEForReplication(filerSource); err != nil {
|
|
return fmt.Errorf("SSE initialization failed: %v", err)
|
|
}
|
|
dataSink.SetSourceFiler(filerSource)
|
|
|
|
var processEventFn func(*filer_pb.SubscribeMetadataResponse) error
|
|
if *backupOption.ignore404Error {
|
|
processEventFnGenerated := genProcessFunction(sourcePath, targetPath, excludePaths, reExcludeFileName, excludeFileNames, excludePathPatterns, dataSink, *backupOption.doDeleteFiles, debug)
|
|
processEventFn = func(resp *filer_pb.SubscribeMetadataResponse) error {
|
|
err := processEventFnGenerated(resp)
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
if isIgnorable404(err) {
|
|
glog.V(0).Infof("got 404 error for %s, ignore it: %s", getSourceKey(resp), err.Error())
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
} else {
|
|
processEventFn = genProcessFunction(sourcePath, targetPath, excludePaths, reExcludeFileName, excludeFileNames, excludePathPatterns, dataSink, *backupOption.doDeleteFiles, debug)
|
|
}
|
|
|
|
processEventFnWithOffset := pb.AddOffsetFunc(processEventFn, 3*time.Second, func(counter int64, lastTsNs int64) error {
|
|
glog.V(0).Infof("backup %s progressed to %v %0.2f/sec", sourceFiler, time.Unix(0, lastTsNs), float64(counter)/float64(3))
|
|
return setOffset(grpcDialOption, sourceFiler, BackupKeyPrefix, int32(sinkId), lastTsNs)
|
|
})
|
|
|
|
if dataSink.IsIncremental() && *filerBackupOptions.retentionDays > 0 {
|
|
go func() {
|
|
for {
|
|
now := time.Now()
|
|
time.Sleep(time.Hour * 24)
|
|
key := util.Join(targetPath, now.Add(-1*time.Hour*24*time.Duration(*filerBackupOptions.retentionDays)).Format("2006-01-02"))
|
|
_ = dataSink.DeleteEntry(util.Join(targetPath, key), true, true, nil)
|
|
glog.V(0).Infof("incremental backup delete directory:%s", key)
|
|
}
|
|
}()
|
|
}
|
|
|
|
prefix := sourcePath
|
|
if !strings.HasSuffix(prefix, "/") {
|
|
prefix = prefix + "/"
|
|
}
|
|
|
|
eventErrorType := pb.RetryForeverOnError
|
|
if *backupOption.disableErrorRetry {
|
|
eventErrorType = pb.TrivialOnError
|
|
}
|
|
|
|
metadataFollowOption := &pb.MetadataFollowOption{
|
|
ClientName: "backup_" + dataSink.GetName(),
|
|
ClientId: clientId,
|
|
ClientEpoch: clientEpoch,
|
|
SelfSignature: 0,
|
|
PathPrefix: prefix,
|
|
AdditionalPathPrefixes: nil,
|
|
DirectoriesToWatch: nil,
|
|
StartTsNs: startFrom.UnixNano(),
|
|
StopTsNs: 0,
|
|
EventErrorType: eventErrorType,
|
|
}
|
|
|
|
return pb.FollowMetadata(sourceFiler, grpcDialOption, metadataFollowOption, processEventFnWithOffset)
|
|
|
|
}
|
|
|
|
func getSourceKey(resp *filer_pb.SubscribeMetadataResponse) string {
|
|
if resp == nil || resp.EventNotification == nil {
|
|
return ""
|
|
}
|
|
message := resp.EventNotification
|
|
if message.NewEntry != nil {
|
|
return string(util.FullPath(message.NewParentPath).Child(message.NewEntry.Name))
|
|
}
|
|
if message.OldEntry != nil {
|
|
return string(util.FullPath(resp.Directory).Child(message.OldEntry.Name))
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// isIgnorable404 returns true if the error represents a 404/not-found condition
|
|
// that should be silently ignored during backup. This covers:
|
|
// - errors wrapping http.ErrNotFound (direct volume server 404 via non-S3 sinks)
|
|
// - errors containing the "404 Not Found: not found" status string (S3 sink path
|
|
// where AWS SDK breaks the errors.Is unwrap chain)
|
|
// - LookupFileId or volume-id-not-found errors from the volume id map
|
|
func isIgnorable404(err error) bool {
|
|
if errors.Is(err, http.ErrNotFound) {
|
|
return true
|
|
}
|
|
errStr := err.Error()
|
|
return strings.Contains(errStr, ignorable404ErrString) ||
|
|
strings.Contains(errStr, "LookupFileId") ||
|
|
(strings.Contains(errStr, "volume id") && strings.Contains(errStr, "not found"))
|
|
}
|