support multiple masters

fix https://github.com/seaweedfs/seaweedfs/issues/6988
This commit is contained in:
chrislu
2025-07-15 10:51:00 -07:00
parent d78aa3d2de
commit 64c5dde2f3
5 changed files with 65 additions and 28 deletions

View File

@@ -187,10 +187,13 @@ func (s *AdminServer) getMasterNodesStatus() []MasterNode {
isLeader = false isLeader = false
} }
masterNodes = append(masterNodes, MasterNode{ currentMaster := s.masterClient.GetMaster(context.Background())
Address: s.masterAddress, if currentMaster != "" {
IsLeader: isLeader, masterNodes = append(masterNodes, MasterNode{
}) Address: string(currentMaster),
IsLeader: isLeader,
})
}
return masterNodes return masterNodes
} }
@@ -222,7 +225,8 @@ func (s *AdminServer) getFilerNodesStatus() []FilerNode {
}) })
if err != nil { if err != nil {
glog.Errorf("Failed to get filer nodes from master %s: %v", s.masterAddress, err) currentMaster := s.masterClient.GetMaster(context.Background())
glog.Errorf("Failed to get filer nodes from master %s: %v", currentMaster, err)
// Return empty list if we can't get filer info from master // Return empty list if we can't get filer info from master
return []FilerNode{} return []FilerNode{}
} }
@@ -257,7 +261,8 @@ func (s *AdminServer) getMessageBrokerNodesStatus() []MessageBrokerNode {
}) })
if err != nil { if err != nil {
glog.Errorf("Failed to get message broker nodes from master %s: %v", s.masterAddress, err) currentMaster := s.masterClient.GetMaster(context.Background())
glog.Errorf("Failed to get message broker nodes from master %s: %v", currentMaster, err)
// Return empty list if we can't get broker info from master // Return empty list if we can't get broker info from master
return []MessageBrokerNode{} return []MessageBrokerNode{}
} }

View File

@@ -14,6 +14,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/credential" "github.com/seaweedfs/seaweedfs/weed/credential"
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
@@ -21,11 +22,12 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb" "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
type AdminServer struct { type AdminServer struct {
masterAddress string masterClient *wdclient.MasterClient
templateFS http.FileSystem templateFS http.FileSystem
dataDir string dataDir string
grpcDialOption grpc.DialOption grpcDialOption grpc.DialOption
@@ -56,12 +58,29 @@ type AdminServer struct {
// Type definitions moved to types.go // Type definitions moved to types.go
func NewAdminServer(masterAddress string, templateFS http.FileSystem, dataDir string) *AdminServer { func NewAdminServer(masters string, templateFS http.FileSystem, dataDir string) *AdminServer {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.client")
// Create master client with multiple master support
masterClient := wdclient.NewMasterClient(
grpcDialOption,
"", // filerGroup - not needed for admin
"admin", // clientType
"", // clientHost - not needed for admin
"", // dataCenter - not needed for admin
"", // rack - not needed for admin
*pb.ServerAddresses(masters).ToServiceDiscovery(),
)
// Start master client connection process (like shell and filer do)
ctx := context.Background()
go masterClient.KeepConnectedToMaster(ctx)
server := &AdminServer{ server := &AdminServer{
masterAddress: masterAddress, masterClient: masterClient,
templateFS: templateFS, templateFS: templateFS,
dataDir: dataDir, dataDir: dataDir,
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"), grpcDialOption: grpcDialOption,
cacheExpiration: 10 * time.Second, cacheExpiration: 10 * time.Second,
filerCacheExpiration: 30 * time.Second, // Cache filers for 30 seconds filerCacheExpiration: 30 * time.Second, // Cache filers for 30 seconds
configPersistence: NewConfigPersistence(dataDir), configPersistence: NewConfigPersistence(dataDir),
@@ -606,7 +625,8 @@ func (s *AdminServer) GetClusterMasters() (*ClusterMastersData, error) {
if err != nil { if err != nil {
// If gRPC call fails, log the error but continue with topology data // If gRPC call fails, log the error but continue with topology data
glog.Errorf("Failed to get raft cluster servers from master %s: %v", s.masterAddress, err) currentMaster := s.masterClient.GetMaster(context.Background())
glog.Errorf("Failed to get raft cluster servers from master %s: %v", currentMaster, err)
} }
// Convert map to slice // Convert map to slice
@@ -614,14 +634,17 @@ func (s *AdminServer) GetClusterMasters() (*ClusterMastersData, error) {
masters = append(masters, *masterInfo) masters = append(masters, *masterInfo)
} }
// If no masters found at all, add the configured master as fallback // If no masters found at all, add the current master as fallback
if len(masters) == 0 { if len(masters) == 0 {
masters = append(masters, MasterInfo{ currentMaster := s.masterClient.GetMaster(context.Background())
Address: s.masterAddress, if currentMaster != "" {
IsLeader: true, masters = append(masters, MasterInfo{
Suffrage: "Voter", Address: string(currentMaster),
}) IsLeader: true,
leaderCount = 1 Suffrage: "Voter",
})
leaderCount = 1
}
} }
return &ClusterMastersData{ return &ClusterMastersData{
@@ -1188,7 +1211,8 @@ func (as *AdminServer) GetConfigInfo(c *gin.Context) {
configInfo := as.configPersistence.GetConfigInfo() configInfo := as.configPersistence.GetConfigInfo()
// Add additional admin server info // Add additional admin server info
configInfo["master_address"] = as.masterAddress currentMaster := as.masterClient.GetMaster(context.Background())
configInfo["master_address"] = string(currentMaster)
configInfo["cache_expiration"] = as.cacheExpiration.String() configInfo["cache_expiration"] = as.cacheExpiration.String()
configInfo["filer_cache_expiration"] = as.filerCacheExpiration.String() configInfo["filer_cache_expiration"] = as.filerCacheExpiration.String()

View File

@@ -16,11 +16,7 @@ import (
// WithMasterClient executes a function with a master client connection // WithMasterClient executes a function with a master client connection
func (s *AdminServer) WithMasterClient(f func(client master_pb.SeaweedClient) error) error { func (s *AdminServer) WithMasterClient(f func(client master_pb.SeaweedClient) error) error {
masterAddr := pb.ServerAddress(s.masterAddress) return s.masterClient.WithClient(false, f)
return pb.WithMasterClient(false, masterAddr, s.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
return f(client)
})
} }
// WithFilerClient executes a function with a filer client connection // WithFilerClient executes a function with a filer client connection
@@ -78,7 +74,8 @@ func (s *AdminServer) getDiscoveredFilers() []string {
}) })
if err != nil { if err != nil {
glog.Warningf("Failed to discover filers from master %s: %v", s.masterAddress, err) currentMaster := s.masterClient.GetMaster(context.Background())
glog.Warningf("Failed to discover filers from master %s: %v", currentMaster, err)
// Return cached filers even if expired, better than nothing // Return cached filers even if expired, better than nothing
return s.cachedFilers return s.cachedFilers
} }

View File

@@ -23,7 +23,8 @@ func (s *AdminServer) GetClusterTopology() (*ClusterTopology, error) {
// Use gRPC only // Use gRPC only
err := s.getTopologyViaGRPC(topology) err := s.getTopologyViaGRPC(topology)
if err != nil { if err != nil {
glog.Errorf("Failed to connect to master server %s: %v", s.masterAddress, err) currentMaster := s.masterClient.GetMaster(context.Background())
glog.Errorf("Failed to connect to master server %s: %v", currentMaster, err)
return nil, fmt.Errorf("gRPC topology request failed: %v", err) return nil, fmt.Errorf("gRPC topology request failed: %v", err)
} }
@@ -40,7 +41,8 @@ func (s *AdminServer) getTopologyViaGRPC(topology *ClusterTopology) error {
err := s.WithMasterClient(func(client master_pb.SeaweedClient) error { err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{}) resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
if err != nil { if err != nil {
glog.Errorf("Failed to get volume list from master %s: %v", s.masterAddress, err) currentMaster := s.masterClient.GetMaster(context.Background())
glog.Errorf("Failed to get volume list from master %s: %v", currentMaster, err)
return err return err
} }

View File

@@ -22,6 +22,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/admin" "github.com/seaweedfs/seaweedfs/weed/admin"
"github.com/seaweedfs/seaweedfs/weed/admin/dash" "github.com/seaweedfs/seaweedfs/weed/admin/dash"
"github.com/seaweedfs/seaweedfs/weed/admin/handlers" "github.com/seaweedfs/seaweedfs/weed/admin/handlers"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/security" "github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
) )
@@ -119,6 +120,14 @@ func runAdmin(cmd *Command, args []string) bool {
return false return false
} }
// Validate that masters string can be parsed
masterAddresses := pb.ServerAddresses(*a.masters).ToAddresses()
if len(masterAddresses) == 0 {
fmt.Println("Error: no valid master addresses found")
fmt.Println("Usage: weed admin -masters=master1:9333,master2:9333")
return false
}
// Security warnings // Security warnings
if *a.adminPassword == "" { if *a.adminPassword == "" {
fmt.Println("WARNING: Admin interface is running without authentication!") fmt.Println("WARNING: Admin interface is running without authentication!")
@@ -153,7 +162,7 @@ func runAdmin(cmd *Command, args []string) bool {
cancel() cancel()
}() }()
// Start the admin server // Start the admin server with all masters
err := startAdminServer(ctx, a) err := startAdminServer(ctx, a)
if err != nil { if err != nil {
fmt.Printf("Admin server error: %v\n", err) fmt.Printf("Admin server error: %v\n", err)