filer.sync: support per-cluster mTLS with -a.security and -b.security (#8872)

* filer.sync: support per-cluster mTLS with -a.security and -b.security flags

When syncing between two clusters that use different certificate authorities,
a single security.toml cannot authenticate to both. Add -a.security and
-b.security flags so each filer can use its own security.toml for TLS.

Closes #8481

* security: fatal on failure to read explicitly provided security config

When -a.security or -b.security is specified, falling back to insecure
credentials on read error would silently bypass mTLS. Fatal instead.

* fix(filer.sync): use source filer's fromTsMs flag in initOffsetFromTsMs

A→B was using bFromTsMs and B→A was using aFromTsMs — these were
swapped. Each path should seed the target's offset with the source
filer's starting timestamp.

* security: return error from LoadClientTLSFromFile, resolve relative PEM paths

Change LoadClientTLSFromFile to return (grpc.DialOption, error) so
callers can handle failures explicitly instead of a silent insecure
fallback. Resolve relative PEM paths (grpc.ca, grpc.client.cert,
grpc.client.key) against the config file's directory.
This commit is contained in:
Chris Lu
2026-04-01 11:05:43 -07:00
committed by GitHub
parent 44d5cb8f90
commit 8572aae403
3 changed files with 61 additions and 14 deletions

View File

@@ -53,6 +53,8 @@ type SyncOptions struct {
chunkConcurrency *int
aDoDeleteFiles *bool
bDoDeleteFiles *bool
aSecurity *string
bSecurity *string
clientId int32
clientEpoch atomic.Int32
debug *bool
@@ -113,6 +115,8 @@ func init() {
syncOptions.metricsHttpPort = cmdFilerSynchronize.Flag.Int("metricsPort", 0, "metrics listen port")
syncOptions.aDoDeleteFiles = cmdFilerSynchronize.Flag.Bool("a.doDeleteFiles", true, "delete and update files when synchronizing on filer A")
syncOptions.bDoDeleteFiles = cmdFilerSynchronize.Flag.Bool("b.doDeleteFiles", true, "delete and update files when synchronizing on filer B")
syncOptions.aSecurity = cmdFilerSynchronize.Flag.String("a.security", "", "security.toml file for filer A when clusters use different certificates")
syncOptions.bSecurity = cmdFilerSynchronize.Flag.String("b.security", "", "security.toml file for filer B when clusters use different certificates")
syncOptions.debug = cmdFilerSynchronize.Flag.Bool("debug", false, "serves runtime profiling data via pprof on the port specified by -debug.port")
syncOptions.debugPort = cmdFilerSynchronize.Flag.Int("debug.port", 6060, "http port for debugging")
syncOptions.clientId = util.RandomInt32()
@@ -144,6 +148,22 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
util.LoadSecurityConfiguration()
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
// per-filer TLS when clusters use different certificates
grpcDialOptionA := grpcDialOption
grpcDialOptionB := grpcDialOption
if *syncOptions.aSecurity != "" {
var err error
if grpcDialOptionA, err = security.LoadClientTLSFromFile(*syncOptions.aSecurity, "grpc.client"); err != nil {
glog.Fatalf("load security config for filer A: %v", err)
}
}
if *syncOptions.bSecurity != "" {
var err error
if grpcDialOptionB, err = security.LoadClientTLSFromFile(*syncOptions.bSecurity, "grpc.client"); err != nil {
glog.Fatalf("load security config for filer B: %v", err)
}
}
grace.SetupProfiling(*syncCpuProfile, *syncMemProfile)
filerA := pb.ServerAddress(*syncOptions.filerA)
@@ -153,13 +173,13 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
go statsCollect.StartMetricsServer(*syncOptions.metricsHttpIp, *syncOptions.metricsHttpPort)
// read a filer signature
aFilerSignature, aFilerErr := replication.ReadFilerSignature(grpcDialOption, filerA)
aFilerSignature, aFilerErr := replication.ReadFilerSignature(grpcDialOptionA, filerA)
if aFilerErr != nil {
glog.Errorf("get filer 'a' signature %d error from %s to %s: %v", aFilerSignature, *syncOptions.filerA, *syncOptions.filerB, aFilerErr)
return true
}
// read b filer signature
bFilerSignature, bFilerErr := replication.ReadFilerSignature(grpcDialOption, filerB)
bFilerSignature, bFilerErr := replication.ReadFilerSignature(grpcDialOptionB, filerB)
if bFilerErr != nil {
glog.Errorf("get filer 'b' signature %d error from %s to %s: %v", bFilerSignature, *syncOptions.filerA, *syncOptions.filerB, bFilerErr)
return true
@@ -189,9 +209,9 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
go func() {
// a->b
// set synchronization start timestamp to offset
initOffsetError := initOffsetFromTsMs(grpcDialOption, filerB, aFilerSignature, *syncOptions.bFromTsMs, getSignaturePrefixByPath(*syncOptions.aPath))
initOffsetError := initOffsetFromTsMs(grpcDialOptionB, filerB, aFilerSignature, *syncOptions.aFromTsMs, getSignaturePrefixByPath(*syncOptions.aPath))
if initOffsetError != nil {
glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.bFromTsMs, *syncOptions.filerA, *syncOptions.filerB, initOffsetError)
glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.aFromTsMs, *syncOptions.filerA, *syncOptions.filerB, initOffsetError)
os.Exit(2)
}
for {
@@ -199,11 +219,12 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
err := doSubscribeFilerMetaChanges(
syncOptions.clientId,
syncOptions.clientEpoch.Load(),
grpcDialOption,
grpcDialOptionA,
filerA,
*syncOptions.aPath,
util.StringSplit(*syncOptions.aExcludePaths, ","),
*syncOptions.aProxyByFiler,
grpcDialOptionB,
filerB,
*syncOptions.bPath,
*syncOptions.bReplication,
@@ -228,9 +249,9 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
if !*syncOptions.isActivePassive {
// b->a
// set synchronization start timestamp to offset
initOffsetError := initOffsetFromTsMs(grpcDialOption, filerA, bFilerSignature, *syncOptions.aFromTsMs, getSignaturePrefixByPath(*syncOptions.bPath))
initOffsetError := initOffsetFromTsMs(grpcDialOptionA, filerA, bFilerSignature, *syncOptions.bFromTsMs, getSignaturePrefixByPath(*syncOptions.bPath))
if initOffsetError != nil {
glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.aFromTsMs, *syncOptions.filerB, *syncOptions.filerA, initOffsetError)
glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.bFromTsMs, *syncOptions.filerB, *syncOptions.filerA, initOffsetError)
os.Exit(2)
}
go func() {
@@ -239,11 +260,12 @@ func runFilerSynchronize(cmd *Command, args []string) bool {
err := doSubscribeFilerMetaChanges(
syncOptions.clientId,
syncOptions.clientEpoch.Load(),
grpcDialOption,
grpcDialOptionB,
filerB,
*syncOptions.bPath,
util.StringSplit(*syncOptions.bExcludePaths, ","),
*syncOptions.bProxyByFiler,
grpcDialOptionA,
filerA,
*syncOptions.aPath,
*syncOptions.aReplication,
@@ -285,12 +307,12 @@ func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAdd
return nil
}
func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string,
func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, sourceGrpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetGrpcDialOption grpc.DialOption, targetFiler pb.ServerAddress, targetPath string,
replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, concurrency int, chunkConcurrency int, doDeleteFiles bool, sourceFilerSignature int32, targetFilerSignature int32, statePtr *atomic.Pointer[syncState]) error {
// if first time, start from now
// if has previously synced, resume from that point of time
sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature)
sourceFilerOffsetTsNs, err := getOffset(targetGrpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature)
if err != nil {
return err
}
@@ -300,8 +322,9 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti
// create filer sink
filerSource := &source.FilerSource{}
filerSource.DoInitialize(sourceFiler.ToHttpAddress(), sourceFiler.ToGrpcAddress(), sourcePath, sourceReadChunkFromFiler)
filerSource.SetGrpcDialOption(sourceGrpcDialOption)
filerSink := &filersink.FilerSink{}
filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler)
filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, targetGrpcDialOption, sinkWriteChunkByFiler)
filerSink.SetChunkConcurrency(chunkConcurrency)
filerSink.SetSourceFiler(filerSource)
@@ -328,7 +351,7 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti
if statePtr != nil {
statePtr.Store(&syncState{
processor: processor,
grpcDialOption: grpcDialOption,
grpcDialOption: targetGrpcDialOption,
targetFiler: targetFiler,
sourcePath: sourcePath,
sourceFilerSignature: sourceFilerSignature,
@@ -351,7 +374,7 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti
lastLogTsNs = now
// collect synchronous offset
statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(offsetTsNs))
return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, offsetTsNs)
return setOffset(targetGrpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, offsetTsNs)
})
prefix := sourcePath
@@ -372,7 +395,7 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti
EventErrorType: pb.RetryForeverOnError,
}
return pb.FollowMetadata(sourceFiler, grpcDialOption, metadataFollowOption, processEventFnWithOffset)
return pb.FollowMetadata(sourceFiler, sourceGrpcDialOption, metadataFollowOption, processEventFnWithOffset)
}