diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 2e3824d72..2f4aa43f2 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -53,6 +53,8 @@ type SyncOptions struct { chunkConcurrency *int aDoDeleteFiles *bool bDoDeleteFiles *bool + aSecurity *string + bSecurity *string clientId int32 clientEpoch atomic.Int32 debug *bool @@ -113,6 +115,8 @@ func init() { syncOptions.metricsHttpPort = cmdFilerSynchronize.Flag.Int("metricsPort", 0, "metrics listen port") syncOptions.aDoDeleteFiles = cmdFilerSynchronize.Flag.Bool("a.doDeleteFiles", true, "delete and update files when synchronizing on filer A") syncOptions.bDoDeleteFiles = cmdFilerSynchronize.Flag.Bool("b.doDeleteFiles", true, "delete and update files when synchronizing on filer B") + syncOptions.aSecurity = cmdFilerSynchronize.Flag.String("a.security", "", "security.toml file for filer A when clusters use different certificates") + syncOptions.bSecurity = cmdFilerSynchronize.Flag.String("b.security", "", "security.toml file for filer B when clusters use different certificates") syncOptions.debug = cmdFilerSynchronize.Flag.Bool("debug", false, "serves runtime profiling data via pprof on the port specified by -debug.port") syncOptions.debugPort = cmdFilerSynchronize.Flag.Int("debug.port", 6060, "http port for debugging") syncOptions.clientId = util.RandomInt32() @@ -144,6 +148,22 @@ func runFilerSynchronize(cmd *Command, args []string) bool { util.LoadSecurityConfiguration() grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client") + // per-filer TLS when clusters use different certificates + grpcDialOptionA := grpcDialOption + grpcDialOptionB := grpcDialOption + if *syncOptions.aSecurity != "" { + var err error + if grpcDialOptionA, err = security.LoadClientTLSFromFile(*syncOptions.aSecurity, "grpc.client"); err != nil { + glog.Fatalf("load security config for filer A: %v", err) + } + } + if *syncOptions.bSecurity != "" { + var err error + if grpcDialOptionB, err = security.LoadClientTLSFromFile(*syncOptions.bSecurity, "grpc.client"); err != nil { + glog.Fatalf("load security config for filer B: %v", err) + } + } + grace.SetupProfiling(*syncCpuProfile, *syncMemProfile) filerA := pb.ServerAddress(*syncOptions.filerA) @@ -153,13 +173,13 @@ func runFilerSynchronize(cmd *Command, args []string) bool { go statsCollect.StartMetricsServer(*syncOptions.metricsHttpIp, *syncOptions.metricsHttpPort) // read a filer signature - aFilerSignature, aFilerErr := replication.ReadFilerSignature(grpcDialOption, filerA) + aFilerSignature, aFilerErr := replication.ReadFilerSignature(grpcDialOptionA, filerA) if aFilerErr != nil { glog.Errorf("get filer 'a' signature %d error from %s to %s: %v", aFilerSignature, *syncOptions.filerA, *syncOptions.filerB, aFilerErr) return true } // read b filer signature - bFilerSignature, bFilerErr := replication.ReadFilerSignature(grpcDialOption, filerB) + bFilerSignature, bFilerErr := replication.ReadFilerSignature(grpcDialOptionB, filerB) if bFilerErr != nil { glog.Errorf("get filer 'b' signature %d error from %s to %s: %v", bFilerSignature, *syncOptions.filerA, *syncOptions.filerB, bFilerErr) return true @@ -189,9 +209,9 @@ func runFilerSynchronize(cmd *Command, args []string) bool { go func() { // a->b // set synchronization start timestamp to offset - initOffsetError := initOffsetFromTsMs(grpcDialOption, filerB, aFilerSignature, *syncOptions.bFromTsMs, getSignaturePrefixByPath(*syncOptions.aPath)) + initOffsetError := initOffsetFromTsMs(grpcDialOptionB, filerB, aFilerSignature, *syncOptions.aFromTsMs, getSignaturePrefixByPath(*syncOptions.aPath)) if initOffsetError != nil { - glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.bFromTsMs, *syncOptions.filerA, *syncOptions.filerB, initOffsetError) + glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.aFromTsMs, *syncOptions.filerA, *syncOptions.filerB, initOffsetError) os.Exit(2) } for { @@ -199,11 +219,12 @@ func runFilerSynchronize(cmd *Command, args []string) bool { err := doSubscribeFilerMetaChanges( syncOptions.clientId, syncOptions.clientEpoch.Load(), - grpcDialOption, + grpcDialOptionA, filerA, *syncOptions.aPath, util.StringSplit(*syncOptions.aExcludePaths, ","), *syncOptions.aProxyByFiler, + grpcDialOptionB, filerB, *syncOptions.bPath, *syncOptions.bReplication, @@ -228,9 +249,9 @@ func runFilerSynchronize(cmd *Command, args []string) bool { if !*syncOptions.isActivePassive { // b->a // set synchronization start timestamp to offset - initOffsetError := initOffsetFromTsMs(grpcDialOption, filerA, bFilerSignature, *syncOptions.aFromTsMs, getSignaturePrefixByPath(*syncOptions.bPath)) + initOffsetError := initOffsetFromTsMs(grpcDialOptionA, filerA, bFilerSignature, *syncOptions.bFromTsMs, getSignaturePrefixByPath(*syncOptions.bPath)) if initOffsetError != nil { - glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.aFromTsMs, *syncOptions.filerB, *syncOptions.filerA, initOffsetError) + glog.Errorf("init offset from timestamp %d error from %s to %s: %v", *syncOptions.bFromTsMs, *syncOptions.filerB, *syncOptions.filerA, initOffsetError) os.Exit(2) } go func() { @@ -239,11 +260,12 @@ func runFilerSynchronize(cmd *Command, args []string) bool { err := doSubscribeFilerMetaChanges( syncOptions.clientId, syncOptions.clientEpoch.Load(), - grpcDialOption, + grpcDialOptionB, filerB, *syncOptions.bPath, util.StringSplit(*syncOptions.bExcludePaths, ","), *syncOptions.bProxyByFiler, + grpcDialOptionA, filerA, *syncOptions.aPath, *syncOptions.aReplication, @@ -285,12 +307,12 @@ func initOffsetFromTsMs(grpcDialOption grpc.DialOption, targetFiler pb.ServerAdd return nil } -func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetFiler pb.ServerAddress, targetPath string, +func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, sourceGrpcDialOption grpc.DialOption, sourceFiler pb.ServerAddress, sourcePath string, sourceExcludePaths []string, sourceReadChunkFromFiler bool, targetGrpcDialOption grpc.DialOption, targetFiler pb.ServerAddress, targetPath string, replicationStr, collection string, ttlSec int, sinkWriteChunkByFiler bool, diskType string, debug bool, concurrency int, chunkConcurrency int, doDeleteFiles bool, sourceFilerSignature int32, targetFilerSignature int32, statePtr *atomic.Pointer[syncState]) error { // if first time, start from now // if has previously synced, resume from that point of time - sourceFilerOffsetTsNs, err := getOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature) + sourceFilerOffsetTsNs, err := getOffset(targetGrpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature) if err != nil { return err } @@ -300,8 +322,9 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti // create filer sink filerSource := &source.FilerSource{} filerSource.DoInitialize(sourceFiler.ToHttpAddress(), sourceFiler.ToGrpcAddress(), sourcePath, sourceReadChunkFromFiler) + filerSource.SetGrpcDialOption(sourceGrpcDialOption) filerSink := &filersink.FilerSink{} - filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, grpcDialOption, sinkWriteChunkByFiler) + filerSink.DoInitialize(targetFiler.ToHttpAddress(), targetFiler.ToGrpcAddress(), targetPath, replicationStr, collection, ttlSec, diskType, targetGrpcDialOption, sinkWriteChunkByFiler) filerSink.SetChunkConcurrency(chunkConcurrency) filerSink.SetSourceFiler(filerSource) @@ -328,7 +351,7 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti if statePtr != nil { statePtr.Store(&syncState{ processor: processor, - grpcDialOption: grpcDialOption, + grpcDialOption: targetGrpcDialOption, targetFiler: targetFiler, sourcePath: sourcePath, sourceFilerSignature: sourceFilerSignature, @@ -351,7 +374,7 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti lastLogTsNs = now // collect synchronous offset statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(offsetTsNs)) - return setOffset(grpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, offsetTsNs) + return setOffset(targetGrpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, offsetTsNs) }) prefix := sourcePath @@ -372,7 +395,7 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, grpcDialOpti EventErrorType: pb.RetryForeverOnError, } - return pb.FollowMetadata(sourceFiler, grpcDialOption, metadataFollowOption, processEventFnWithOffset) + return pb.FollowMetadata(sourceFiler, sourceGrpcDialOption, metadataFollowOption, processEventFnWithOffset) } diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index 0dd3ea5b9..32c2a2235 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -50,6 +50,10 @@ func (fs *FilerSource) DoInitialize(address, grpcAddress string, dir string, rea return nil } +func (fs *FilerSource) SetGrpcDialOption(option grpc.DialOption) { + fs.grpcDialOption = option +} + func (fs *FilerSource) LookupFileId(ctx context.Context, part string) (fileUrls []string, err error) { vid2Locations := make(map[string]*filer_pb.Locations) diff --git a/weed/security/tls.go b/weed/security/tls.go index bbe4bbddc..78d4e501a 100644 --- a/weed/security/tls.go +++ b/weed/security/tls.go @@ -7,10 +7,13 @@ import ( "fmt" "net" "os" + "path/filepath" "slices" "strings" "time" + "github.com/spf13/viper" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/grpc" @@ -139,6 +142,23 @@ func LoadServerTLS(config *util.ViperProxy, component string) (grpc.ServerOption return grpc.Creds(ta), nil } +func LoadClientTLSFromFile(configFile string, component string) (grpc.DialOption, error) { + v := viper.New() + v.SetConfigFile(configFile) + if err := v.ReadInConfig(); err != nil { + return nil, fmt.Errorf("failed to read security config %s: %v", configFile, err) + } + // Resolve relative PEM paths against the config file's directory. + configDir := filepath.Dir(configFile) + for _, key := range []string{"grpc.ca", component + ".cert", component + ".key"} { + p := v.GetString(key) + if p != "" && !filepath.IsAbs(p) { + v.Set(key, filepath.Join(configDir, p)) + } + } + return LoadClientTLS(&util.ViperProxy{Viper: v}, component), nil +} + func LoadClientTLS(config *util.ViperProxy, component string) grpc.DialOption { if config == nil { return grpc.WithTransportCredentials(insecure.NewCredentials())