Add cluster.raft.leader.transfer command for graceful leader change (#7819)
* proto: add RaftLeadershipTransfer RPC for forced leader change Add new gRPC RPC and messages for leadership transfer: - RaftLeadershipTransferRequest: optional target_id and target_address - RaftLeadershipTransferResponse: previous_leader and new_leader This enables graceful leadership transfer before master maintenance, reducing errors in filers during planned maintenance windows. Ref: https://github.com/seaweedfs/seaweedfs/issues/7527 * proto: regenerate Go files for RaftLeadershipTransfer Generated from master.proto changes. * master: implement RaftLeadershipTransfer gRPC handler Add gRPC handler for leadership transfer with support for: - Transfer to any eligible follower (when target_id is empty) - Transfer to a specific server (when target_id and target_address are provided) Uses hashicorp/raft LeadershipTransfer() and LeadershipTransferToServer() APIs. Returns the previous and new leader in the response. * shell: add cluster.raft.leader.transfer command Add weed shell command for graceful leadership transfer: - Displays current cluster status before transfer - Supports auto-selection of target (any eligible follower) - Supports targeted transfer with -id and -address flags - Provides clear feedback on success/failure with troubleshooting tips Usage: cluster.raft.leader.transfer cluster.raft.leader.transfer -id <server_id> -address <grpc_address> * master: add unit tests for raft gRPC handlers Add tests covering: - RaftLeadershipTransfer with no raft initialized - RaftLeadershipTransfer with target_id but no address - RaftListClusterServers with no raft initialized - RaftAddServer with no raft initialized - RaftRemoveServer with no raft initialized These tests verify error handling when raft is not configured. * shell: add tests for cluster.raft.leader.transfer command Add tests covering: - Command name and help text validation - HasTag returns false for ResourceHeavy - Validation of -id without -address - Argument parsing with unknown flags * master: clarify that leadership transfer requires -raftHashicorp The default raft implementation (seaweedfs/raft, a goraft fork) does not support graceful leadership transfer. This feature is only available when using hashicorp raft (-raftHashicorp=true). Update error messages and help text to make this requirement clear: - gRPC handler returns specific error for goraft users - Shell command help text notes the requirement - Added test for goraft case * test: use strings.Contains instead of custom helper Replace custom contains/containsHelper functions with the standard library strings.Contains for better maintainability. * shell: return flag parsing errors instead of swallowing them - Return the error from flag.Parse() instead of returning nil - Update test to explicitly assert error for unknown flags * test: document integration test scenarios for Raft leadership transfer Add comments explaining: - Why these unit tests only cover 'Raft not initialized' scenarios - What integration tests should cover (with multi-master cluster) - hashicorp/raft uses concrete types that cannot be easily mocked * fix: address reviewer feedback on tests and leader routing - Remove misleading tests that couldn't properly validate their documented behavior without a real Raft cluster: - TestRaftLeadershipTransfer_GoraftNotSupported - TestRaftLeadershipTransfer_ValidationTargetIdWithoutAddress - Change WithClient(false) to WithClient(true) for RaftLeadershipTransfer RPC to ensure the request is routed to the current leader * Improve cluster.raft.transferLeader command - Rename command from cluster.raft.leader.transfer to cluster.raft.transferLeader - Add symmetric validation: -id and -address must be specified together - Handle case where same leader is re-elected after transfer - Add test for -address without -id validation - Add docker compose file for 5-master raft cluster testing
This commit is contained in:
@@ -51,6 +51,8 @@ service Seaweed {
|
||||
}
|
||||
rpc RaftRemoveServer (RaftRemoveServerRequest) returns (RaftRemoveServerResponse) {
|
||||
}
|
||||
rpc RaftLeadershipTransfer (RaftLeadershipTransferRequest) returns (RaftLeadershipTransferResponse) {
|
||||
}
|
||||
rpc VolumeGrow (VolumeGrowRequest) returns (VolumeGrowResponse) {
|
||||
}
|
||||
}
|
||||
@@ -443,5 +445,14 @@ message RaftListClusterServersResponse {
|
||||
repeated ClusterServers cluster_servers = 1;
|
||||
}
|
||||
|
||||
message RaftLeadershipTransferRequest {
|
||||
string target_id = 1; // Optional: target server ID. If empty, transfers to any eligible follower
|
||||
string target_address = 2; // Optional: target server address. Required if target_id is specified
|
||||
}
|
||||
message RaftLeadershipTransferResponse {
|
||||
string previous_leader = 1;
|
||||
string new_leader = 2;
|
||||
}
|
||||
|
||||
message VolumeGrowResponse {
|
||||
}
|
||||
@@ -3690,6 +3690,110 @@ func (x *RaftListClusterServersResponse) GetClusterServers() []*RaftListClusterS
|
||||
return nil
|
||||
}
|
||||
|
||||
type RaftLeadershipTransferRequest struct {
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
TargetId string `protobuf:"bytes,1,opt,name=target_id,json=targetId,proto3" json:"target_id,omitempty"` // Optional: target server ID. If empty, transfers to any eligible follower
|
||||
TargetAddress string `protobuf:"bytes,2,opt,name=target_address,json=targetAddress,proto3" json:"target_address,omitempty"` // Optional: target server address. Required if target_id is specified
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
}
|
||||
|
||||
func (x *RaftLeadershipTransferRequest) Reset() {
|
||||
*x = RaftLeadershipTransferRequest{}
|
||||
mi := &file_master_proto_msgTypes[58]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
|
||||
func (x *RaftLeadershipTransferRequest) String() string {
|
||||
return protoimpl.X.MessageStringOf(x)
|
||||
}
|
||||
|
||||
func (*RaftLeadershipTransferRequest) ProtoMessage() {}
|
||||
|
||||
func (x *RaftLeadershipTransferRequest) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_master_proto_msgTypes[58]
|
||||
if x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
return ms
|
||||
}
|
||||
return mi.MessageOf(x)
|
||||
}
|
||||
|
||||
// Deprecated: Use RaftLeadershipTransferRequest.ProtoReflect.Descriptor instead.
|
||||
func (*RaftLeadershipTransferRequest) Descriptor() ([]byte, []int) {
|
||||
return file_master_proto_rawDescGZIP(), []int{58}
|
||||
}
|
||||
|
||||
func (x *RaftLeadershipTransferRequest) GetTargetId() string {
|
||||
if x != nil {
|
||||
return x.TargetId
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (x *RaftLeadershipTransferRequest) GetTargetAddress() string {
|
||||
if x != nil {
|
||||
return x.TargetAddress
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
type RaftLeadershipTransferResponse struct {
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
PreviousLeader string `protobuf:"bytes,1,opt,name=previous_leader,json=previousLeader,proto3" json:"previous_leader,omitempty"`
|
||||
NewLeader string `protobuf:"bytes,2,opt,name=new_leader,json=newLeader,proto3" json:"new_leader,omitempty"`
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
}
|
||||
|
||||
func (x *RaftLeadershipTransferResponse) Reset() {
|
||||
*x = RaftLeadershipTransferResponse{}
|
||||
mi := &file_master_proto_msgTypes[59]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
|
||||
func (x *RaftLeadershipTransferResponse) String() string {
|
||||
return protoimpl.X.MessageStringOf(x)
|
||||
}
|
||||
|
||||
func (*RaftLeadershipTransferResponse) ProtoMessage() {}
|
||||
|
||||
func (x *RaftLeadershipTransferResponse) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_master_proto_msgTypes[59]
|
||||
if x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
return ms
|
||||
}
|
||||
return mi.MessageOf(x)
|
||||
}
|
||||
|
||||
// Deprecated: Use RaftLeadershipTransferResponse.ProtoReflect.Descriptor instead.
|
||||
func (*RaftLeadershipTransferResponse) Descriptor() ([]byte, []int) {
|
||||
return file_master_proto_rawDescGZIP(), []int{59}
|
||||
}
|
||||
|
||||
func (x *RaftLeadershipTransferResponse) GetPreviousLeader() string {
|
||||
if x != nil {
|
||||
return x.PreviousLeader
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (x *RaftLeadershipTransferResponse) GetNewLeader() string {
|
||||
if x != nil {
|
||||
return x.NewLeader
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
type VolumeGrowResponse struct {
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
unknownFields protoimpl.UnknownFields
|
||||
@@ -3698,7 +3802,7 @@ type VolumeGrowResponse struct {
|
||||
|
||||
func (x *VolumeGrowResponse) Reset() {
|
||||
*x = VolumeGrowResponse{}
|
||||
mi := &file_master_proto_msgTypes[58]
|
||||
mi := &file_master_proto_msgTypes[60]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
@@ -3710,7 +3814,7 @@ func (x *VolumeGrowResponse) String() string {
|
||||
func (*VolumeGrowResponse) ProtoMessage() {}
|
||||
|
||||
func (x *VolumeGrowResponse) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_master_proto_msgTypes[58]
|
||||
mi := &file_master_proto_msgTypes[60]
|
||||
if x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
@@ -3723,7 +3827,7 @@ func (x *VolumeGrowResponse) ProtoReflect() protoreflect.Message {
|
||||
|
||||
// Deprecated: Use VolumeGrowResponse.ProtoReflect.Descriptor instead.
|
||||
func (*VolumeGrowResponse) Descriptor() ([]byte, []int) {
|
||||
return file_master_proto_rawDescGZIP(), []int{58}
|
||||
return file_master_proto_rawDescGZIP(), []int{60}
|
||||
}
|
||||
|
||||
type SuperBlockExtra_ErasureCoding struct {
|
||||
@@ -3737,7 +3841,7 @@ type SuperBlockExtra_ErasureCoding struct {
|
||||
|
||||
func (x *SuperBlockExtra_ErasureCoding) Reset() {
|
||||
*x = SuperBlockExtra_ErasureCoding{}
|
||||
mi := &file_master_proto_msgTypes[61]
|
||||
mi := &file_master_proto_msgTypes[63]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
@@ -3749,7 +3853,7 @@ func (x *SuperBlockExtra_ErasureCoding) String() string {
|
||||
func (*SuperBlockExtra_ErasureCoding) ProtoMessage() {}
|
||||
|
||||
func (x *SuperBlockExtra_ErasureCoding) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_master_proto_msgTypes[61]
|
||||
mi := &file_master_proto_msgTypes[63]
|
||||
if x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
@@ -3798,7 +3902,7 @@ type LookupVolumeResponse_VolumeIdLocation struct {
|
||||
|
||||
func (x *LookupVolumeResponse_VolumeIdLocation) Reset() {
|
||||
*x = LookupVolumeResponse_VolumeIdLocation{}
|
||||
mi := &file_master_proto_msgTypes[62]
|
||||
mi := &file_master_proto_msgTypes[64]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
@@ -3810,7 +3914,7 @@ func (x *LookupVolumeResponse_VolumeIdLocation) String() string {
|
||||
func (*LookupVolumeResponse_VolumeIdLocation) ProtoMessage() {}
|
||||
|
||||
func (x *LookupVolumeResponse_VolumeIdLocation) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_master_proto_msgTypes[62]
|
||||
mi := &file_master_proto_msgTypes[64]
|
||||
if x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
@@ -3864,7 +3968,7 @@ type LookupEcVolumeResponse_EcShardIdLocation struct {
|
||||
|
||||
func (x *LookupEcVolumeResponse_EcShardIdLocation) Reset() {
|
||||
*x = LookupEcVolumeResponse_EcShardIdLocation{}
|
||||
mi := &file_master_proto_msgTypes[67]
|
||||
mi := &file_master_proto_msgTypes[69]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
@@ -3876,7 +3980,7 @@ func (x *LookupEcVolumeResponse_EcShardIdLocation) String() string {
|
||||
func (*LookupEcVolumeResponse_EcShardIdLocation) ProtoMessage() {}
|
||||
|
||||
func (x *LookupEcVolumeResponse_EcShardIdLocation) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_master_proto_msgTypes[67]
|
||||
mi := &file_master_proto_msgTypes[69]
|
||||
if x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
@@ -3919,7 +4023,7 @@ type ListClusterNodesResponse_ClusterNode struct {
|
||||
|
||||
func (x *ListClusterNodesResponse_ClusterNode) Reset() {
|
||||
*x = ListClusterNodesResponse_ClusterNode{}
|
||||
mi := &file_master_proto_msgTypes[68]
|
||||
mi := &file_master_proto_msgTypes[70]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
@@ -3931,7 +4035,7 @@ func (x *ListClusterNodesResponse_ClusterNode) String() string {
|
||||
func (*ListClusterNodesResponse_ClusterNode) ProtoMessage() {}
|
||||
|
||||
func (x *ListClusterNodesResponse_ClusterNode) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_master_proto_msgTypes[68]
|
||||
mi := &file_master_proto_msgTypes[70]
|
||||
if x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
@@ -3994,7 +4098,7 @@ type RaftListClusterServersResponse_ClusterServers struct {
|
||||
|
||||
func (x *RaftListClusterServersResponse_ClusterServers) Reset() {
|
||||
*x = RaftListClusterServersResponse_ClusterServers{}
|
||||
mi := &file_master_proto_msgTypes[69]
|
||||
mi := &file_master_proto_msgTypes[71]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
@@ -4006,7 +4110,7 @@ func (x *RaftListClusterServersResponse_ClusterServers) String() string {
|
||||
func (*RaftListClusterServersResponse_ClusterServers) ProtoMessage() {}
|
||||
|
||||
func (x *RaftListClusterServersResponse_ClusterServers) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_master_proto_msgTypes[69]
|
||||
mi := &file_master_proto_msgTypes[71]
|
||||
if x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
@@ -4405,8 +4509,15 @@ const file_master_proto_rawDesc = "" +
|
||||
"\x02id\x18\x01 \x01(\tR\x02id\x12\x18\n" +
|
||||
"\aaddress\x18\x02 \x01(\tR\aaddress\x12\x1a\n" +
|
||||
"\bsuffrage\x18\x03 \x01(\tR\bsuffrage\x12\x1a\n" +
|
||||
"\bisLeader\x18\x04 \x01(\bR\bisLeader\"\x14\n" +
|
||||
"\x12VolumeGrowResponse2\xd5\x0f\n" +
|
||||
"\bisLeader\x18\x04 \x01(\bR\bisLeader\"c\n" +
|
||||
"\x1dRaftLeadershipTransferRequest\x12\x1b\n" +
|
||||
"\ttarget_id\x18\x01 \x01(\tR\btargetId\x12%\n" +
|
||||
"\x0etarget_address\x18\x02 \x01(\tR\rtargetAddress\"h\n" +
|
||||
"\x1eRaftLeadershipTransferResponse\x12'\n" +
|
||||
"\x0fprevious_leader\x18\x01 \x01(\tR\x0epreviousLeader\x12\x1d\n" +
|
||||
"\n" +
|
||||
"new_leader\x18\x02 \x01(\tR\tnewLeader\"\x14\n" +
|
||||
"\x12VolumeGrowResponse2\xc6\x10\n" +
|
||||
"\aSeaweed\x12I\n" +
|
||||
"\rSendHeartbeat\x12\x14.master_pb.Heartbeat\x1a\x1c.master_pb.HeartbeatResponse\"\x00(\x010\x01\x12X\n" +
|
||||
"\rKeepConnected\x12\x1f.master_pb.KeepConnectedRequest\x1a .master_pb.KeepConnectedResponse\"\x00(\x010\x01\x12Q\n" +
|
||||
@@ -4431,7 +4542,8 @@ const file_master_proto_rawDesc = "" +
|
||||
"\x04Ping\x12\x16.master_pb.PingRequest\x1a\x17.master_pb.PingResponse\"\x00\x12o\n" +
|
||||
"\x16RaftListClusterServers\x12(.master_pb.RaftListClusterServersRequest\x1a).master_pb.RaftListClusterServersResponse\"\x00\x12T\n" +
|
||||
"\rRaftAddServer\x12\x1f.master_pb.RaftAddServerRequest\x1a .master_pb.RaftAddServerResponse\"\x00\x12]\n" +
|
||||
"\x10RaftRemoveServer\x12\".master_pb.RaftRemoveServerRequest\x1a#.master_pb.RaftRemoveServerResponse\"\x00\x12K\n" +
|
||||
"\x10RaftRemoveServer\x12\".master_pb.RaftRemoveServerRequest\x1a#.master_pb.RaftRemoveServerResponse\"\x00\x12o\n" +
|
||||
"\x16RaftLeadershipTransfer\x12(.master_pb.RaftLeadershipTransferRequest\x1a).master_pb.RaftLeadershipTransferResponse\"\x00\x12K\n" +
|
||||
"\n" +
|
||||
"VolumeGrow\x12\x1c.master_pb.VolumeGrowRequest\x1a\x1d.master_pb.VolumeGrowResponse\"\x00B2Z0github.com/seaweedfs/seaweedfs/weed/pb/master_pbb\x06proto3"
|
||||
|
||||
@@ -4447,7 +4559,7 @@ func file_master_proto_rawDescGZIP() []byte {
|
||||
return file_master_proto_rawDescData
|
||||
}
|
||||
|
||||
var file_master_proto_msgTypes = make([]protoimpl.MessageInfo, 70)
|
||||
var file_master_proto_msgTypes = make([]protoimpl.MessageInfo, 72)
|
||||
var file_master_proto_goTypes = []any{
|
||||
(*Heartbeat)(nil), // 0: master_pb.Heartbeat
|
||||
(*HeartbeatResponse)(nil), // 1: master_pb.HeartbeatResponse
|
||||
@@ -4507,18 +4619,20 @@ var file_master_proto_goTypes = []any{
|
||||
(*RaftRemoveServerResponse)(nil), // 55: master_pb.RaftRemoveServerResponse
|
||||
(*RaftListClusterServersRequest)(nil), // 56: master_pb.RaftListClusterServersRequest
|
||||
(*RaftListClusterServersResponse)(nil), // 57: master_pb.RaftListClusterServersResponse
|
||||
(*VolumeGrowResponse)(nil), // 58: master_pb.VolumeGrowResponse
|
||||
nil, // 59: master_pb.Heartbeat.MaxVolumeCountsEntry
|
||||
nil, // 60: master_pb.StorageBackend.PropertiesEntry
|
||||
(*SuperBlockExtra_ErasureCoding)(nil), // 61: master_pb.SuperBlockExtra.ErasureCoding
|
||||
(*LookupVolumeResponse_VolumeIdLocation)(nil), // 62: master_pb.LookupVolumeResponse.VolumeIdLocation
|
||||
nil, // 63: master_pb.DataNodeInfo.DiskInfosEntry
|
||||
nil, // 64: master_pb.RackInfo.DiskInfosEntry
|
||||
nil, // 65: master_pb.DataCenterInfo.DiskInfosEntry
|
||||
nil, // 66: master_pb.TopologyInfo.DiskInfosEntry
|
||||
(*LookupEcVolumeResponse_EcShardIdLocation)(nil), // 67: master_pb.LookupEcVolumeResponse.EcShardIdLocation
|
||||
(*ListClusterNodesResponse_ClusterNode)(nil), // 68: master_pb.ListClusterNodesResponse.ClusterNode
|
||||
(*RaftListClusterServersResponse_ClusterServers)(nil), // 69: master_pb.RaftListClusterServersResponse.ClusterServers
|
||||
(*RaftLeadershipTransferRequest)(nil), // 58: master_pb.RaftLeadershipTransferRequest
|
||||
(*RaftLeadershipTransferResponse)(nil), // 59: master_pb.RaftLeadershipTransferResponse
|
||||
(*VolumeGrowResponse)(nil), // 60: master_pb.VolumeGrowResponse
|
||||
nil, // 61: master_pb.Heartbeat.MaxVolumeCountsEntry
|
||||
nil, // 62: master_pb.StorageBackend.PropertiesEntry
|
||||
(*SuperBlockExtra_ErasureCoding)(nil), // 63: master_pb.SuperBlockExtra.ErasureCoding
|
||||
(*LookupVolumeResponse_VolumeIdLocation)(nil), // 64: master_pb.LookupVolumeResponse.VolumeIdLocation
|
||||
nil, // 65: master_pb.DataNodeInfo.DiskInfosEntry
|
||||
nil, // 66: master_pb.RackInfo.DiskInfosEntry
|
||||
nil, // 67: master_pb.DataCenterInfo.DiskInfosEntry
|
||||
nil, // 68: master_pb.TopologyInfo.DiskInfosEntry
|
||||
(*LookupEcVolumeResponse_EcShardIdLocation)(nil), // 69: master_pb.LookupEcVolumeResponse.EcShardIdLocation
|
||||
(*ListClusterNodesResponse_ClusterNode)(nil), // 70: master_pb.ListClusterNodesResponse.ClusterNode
|
||||
(*RaftListClusterServersResponse_ClusterServers)(nil), // 71: master_pb.RaftListClusterServersResponse.ClusterServers
|
||||
}
|
||||
var file_master_proto_depIdxs = []int32{
|
||||
2, // 0: master_pb.Heartbeat.volumes:type_name -> master_pb.VolumeInformationMessage
|
||||
@@ -4527,30 +4641,30 @@ var file_master_proto_depIdxs = []int32{
|
||||
4, // 3: master_pb.Heartbeat.ec_shards:type_name -> master_pb.VolumeEcShardInformationMessage
|
||||
4, // 4: master_pb.Heartbeat.new_ec_shards:type_name -> master_pb.VolumeEcShardInformationMessage
|
||||
4, // 5: master_pb.Heartbeat.deleted_ec_shards:type_name -> master_pb.VolumeEcShardInformationMessage
|
||||
59, // 6: master_pb.Heartbeat.max_volume_counts:type_name -> master_pb.Heartbeat.MaxVolumeCountsEntry
|
||||
61, // 6: master_pb.Heartbeat.max_volume_counts:type_name -> master_pb.Heartbeat.MaxVolumeCountsEntry
|
||||
5, // 7: master_pb.HeartbeatResponse.storage_backends:type_name -> master_pb.StorageBackend
|
||||
60, // 8: master_pb.StorageBackend.properties:type_name -> master_pb.StorageBackend.PropertiesEntry
|
||||
61, // 9: master_pb.SuperBlockExtra.erasure_coding:type_name -> master_pb.SuperBlockExtra.ErasureCoding
|
||||
62, // 8: master_pb.StorageBackend.properties:type_name -> master_pb.StorageBackend.PropertiesEntry
|
||||
63, // 9: master_pb.SuperBlockExtra.erasure_coding:type_name -> master_pb.SuperBlockExtra.ErasureCoding
|
||||
9, // 10: master_pb.KeepConnectedResponse.volume_location:type_name -> master_pb.VolumeLocation
|
||||
10, // 11: master_pb.KeepConnectedResponse.cluster_node_update:type_name -> master_pb.ClusterNodeUpdate
|
||||
62, // 12: master_pb.LookupVolumeResponse.volume_id_locations:type_name -> master_pb.LookupVolumeResponse.VolumeIdLocation
|
||||
64, // 12: master_pb.LookupVolumeResponse.volume_id_locations:type_name -> master_pb.LookupVolumeResponse.VolumeIdLocation
|
||||
14, // 13: master_pb.AssignResponse.replicas:type_name -> master_pb.Location
|
||||
14, // 14: master_pb.AssignResponse.location:type_name -> master_pb.Location
|
||||
20, // 15: master_pb.CollectionListResponse.collections:type_name -> master_pb.Collection
|
||||
2, // 16: master_pb.DiskInfo.volume_infos:type_name -> master_pb.VolumeInformationMessage
|
||||
4, // 17: master_pb.DiskInfo.ec_shard_infos:type_name -> master_pb.VolumeEcShardInformationMessage
|
||||
63, // 18: master_pb.DataNodeInfo.diskInfos:type_name -> master_pb.DataNodeInfo.DiskInfosEntry
|
||||
65, // 18: master_pb.DataNodeInfo.diskInfos:type_name -> master_pb.DataNodeInfo.DiskInfosEntry
|
||||
26, // 19: master_pb.RackInfo.data_node_infos:type_name -> master_pb.DataNodeInfo
|
||||
64, // 20: master_pb.RackInfo.diskInfos:type_name -> master_pb.RackInfo.DiskInfosEntry
|
||||
66, // 20: master_pb.RackInfo.diskInfos:type_name -> master_pb.RackInfo.DiskInfosEntry
|
||||
27, // 21: master_pb.DataCenterInfo.rack_infos:type_name -> master_pb.RackInfo
|
||||
65, // 22: master_pb.DataCenterInfo.diskInfos:type_name -> master_pb.DataCenterInfo.DiskInfosEntry
|
||||
67, // 22: master_pb.DataCenterInfo.diskInfos:type_name -> master_pb.DataCenterInfo.DiskInfosEntry
|
||||
28, // 23: master_pb.TopologyInfo.data_center_infos:type_name -> master_pb.DataCenterInfo
|
||||
66, // 24: master_pb.TopologyInfo.diskInfos:type_name -> master_pb.TopologyInfo.DiskInfosEntry
|
||||
68, // 24: master_pb.TopologyInfo.diskInfos:type_name -> master_pb.TopologyInfo.DiskInfosEntry
|
||||
29, // 25: master_pb.VolumeListResponse.topology_info:type_name -> master_pb.TopologyInfo
|
||||
67, // 26: master_pb.LookupEcVolumeResponse.shard_id_locations:type_name -> master_pb.LookupEcVolumeResponse.EcShardIdLocation
|
||||
69, // 26: master_pb.LookupEcVolumeResponse.shard_id_locations:type_name -> master_pb.LookupEcVolumeResponse.EcShardIdLocation
|
||||
5, // 27: master_pb.GetMasterConfigurationResponse.storage_backends:type_name -> master_pb.StorageBackend
|
||||
68, // 28: master_pb.ListClusterNodesResponse.cluster_nodes:type_name -> master_pb.ListClusterNodesResponse.ClusterNode
|
||||
69, // 29: master_pb.RaftListClusterServersResponse.cluster_servers:type_name -> master_pb.RaftListClusterServersResponse.ClusterServers
|
||||
70, // 28: master_pb.ListClusterNodesResponse.cluster_nodes:type_name -> master_pb.ListClusterNodesResponse.ClusterNode
|
||||
71, // 29: master_pb.RaftListClusterServersResponse.cluster_servers:type_name -> master_pb.RaftListClusterServersResponse.ClusterServers
|
||||
14, // 30: master_pb.LookupVolumeResponse.VolumeIdLocation.locations:type_name -> master_pb.Location
|
||||
25, // 31: master_pb.DataNodeInfo.DiskInfosEntry.value:type_name -> master_pb.DiskInfo
|
||||
25, // 32: master_pb.RackInfo.DiskInfosEntry.value:type_name -> master_pb.DiskInfo
|
||||
@@ -4579,32 +4693,34 @@ var file_master_proto_depIdxs = []int32{
|
||||
56, // 55: master_pb.Seaweed.RaftListClusterServers:input_type -> master_pb.RaftListClusterServersRequest
|
||||
52, // 56: master_pb.Seaweed.RaftAddServer:input_type -> master_pb.RaftAddServerRequest
|
||||
54, // 57: master_pb.Seaweed.RaftRemoveServer:input_type -> master_pb.RaftRemoveServerRequest
|
||||
16, // 58: master_pb.Seaweed.VolumeGrow:input_type -> master_pb.VolumeGrowRequest
|
||||
1, // 59: master_pb.Seaweed.SendHeartbeat:output_type -> master_pb.HeartbeatResponse
|
||||
11, // 60: master_pb.Seaweed.KeepConnected:output_type -> master_pb.KeepConnectedResponse
|
||||
13, // 61: master_pb.Seaweed.LookupVolume:output_type -> master_pb.LookupVolumeResponse
|
||||
17, // 62: master_pb.Seaweed.Assign:output_type -> master_pb.AssignResponse
|
||||
17, // 63: master_pb.Seaweed.StreamAssign:output_type -> master_pb.AssignResponse
|
||||
19, // 64: master_pb.Seaweed.Statistics:output_type -> master_pb.StatisticsResponse
|
||||
22, // 65: master_pb.Seaweed.CollectionList:output_type -> master_pb.CollectionListResponse
|
||||
24, // 66: master_pb.Seaweed.CollectionDelete:output_type -> master_pb.CollectionDeleteResponse
|
||||
31, // 67: master_pb.Seaweed.VolumeList:output_type -> master_pb.VolumeListResponse
|
||||
33, // 68: master_pb.Seaweed.LookupEcVolume:output_type -> master_pb.LookupEcVolumeResponse
|
||||
35, // 69: master_pb.Seaweed.VacuumVolume:output_type -> master_pb.VacuumVolumeResponse
|
||||
37, // 70: master_pb.Seaweed.DisableVacuum:output_type -> master_pb.DisableVacuumResponse
|
||||
39, // 71: master_pb.Seaweed.EnableVacuum:output_type -> master_pb.EnableVacuumResponse
|
||||
41, // 72: master_pb.Seaweed.VolumeMarkReadonly:output_type -> master_pb.VolumeMarkReadonlyResponse
|
||||
43, // 73: master_pb.Seaweed.GetMasterConfiguration:output_type -> master_pb.GetMasterConfigurationResponse
|
||||
45, // 74: master_pb.Seaweed.ListClusterNodes:output_type -> master_pb.ListClusterNodesResponse
|
||||
47, // 75: master_pb.Seaweed.LeaseAdminToken:output_type -> master_pb.LeaseAdminTokenResponse
|
||||
49, // 76: master_pb.Seaweed.ReleaseAdminToken:output_type -> master_pb.ReleaseAdminTokenResponse
|
||||
51, // 77: master_pb.Seaweed.Ping:output_type -> master_pb.PingResponse
|
||||
57, // 78: master_pb.Seaweed.RaftListClusterServers:output_type -> master_pb.RaftListClusterServersResponse
|
||||
53, // 79: master_pb.Seaweed.RaftAddServer:output_type -> master_pb.RaftAddServerResponse
|
||||
55, // 80: master_pb.Seaweed.RaftRemoveServer:output_type -> master_pb.RaftRemoveServerResponse
|
||||
58, // 81: master_pb.Seaweed.VolumeGrow:output_type -> master_pb.VolumeGrowResponse
|
||||
59, // [59:82] is the sub-list for method output_type
|
||||
36, // [36:59] is the sub-list for method input_type
|
||||
58, // 58: master_pb.Seaweed.RaftLeadershipTransfer:input_type -> master_pb.RaftLeadershipTransferRequest
|
||||
16, // 59: master_pb.Seaweed.VolumeGrow:input_type -> master_pb.VolumeGrowRequest
|
||||
1, // 60: master_pb.Seaweed.SendHeartbeat:output_type -> master_pb.HeartbeatResponse
|
||||
11, // 61: master_pb.Seaweed.KeepConnected:output_type -> master_pb.KeepConnectedResponse
|
||||
13, // 62: master_pb.Seaweed.LookupVolume:output_type -> master_pb.LookupVolumeResponse
|
||||
17, // 63: master_pb.Seaweed.Assign:output_type -> master_pb.AssignResponse
|
||||
17, // 64: master_pb.Seaweed.StreamAssign:output_type -> master_pb.AssignResponse
|
||||
19, // 65: master_pb.Seaweed.Statistics:output_type -> master_pb.StatisticsResponse
|
||||
22, // 66: master_pb.Seaweed.CollectionList:output_type -> master_pb.CollectionListResponse
|
||||
24, // 67: master_pb.Seaweed.CollectionDelete:output_type -> master_pb.CollectionDeleteResponse
|
||||
31, // 68: master_pb.Seaweed.VolumeList:output_type -> master_pb.VolumeListResponse
|
||||
33, // 69: master_pb.Seaweed.LookupEcVolume:output_type -> master_pb.LookupEcVolumeResponse
|
||||
35, // 70: master_pb.Seaweed.VacuumVolume:output_type -> master_pb.VacuumVolumeResponse
|
||||
37, // 71: master_pb.Seaweed.DisableVacuum:output_type -> master_pb.DisableVacuumResponse
|
||||
39, // 72: master_pb.Seaweed.EnableVacuum:output_type -> master_pb.EnableVacuumResponse
|
||||
41, // 73: master_pb.Seaweed.VolumeMarkReadonly:output_type -> master_pb.VolumeMarkReadonlyResponse
|
||||
43, // 74: master_pb.Seaweed.GetMasterConfiguration:output_type -> master_pb.GetMasterConfigurationResponse
|
||||
45, // 75: master_pb.Seaweed.ListClusterNodes:output_type -> master_pb.ListClusterNodesResponse
|
||||
47, // 76: master_pb.Seaweed.LeaseAdminToken:output_type -> master_pb.LeaseAdminTokenResponse
|
||||
49, // 77: master_pb.Seaweed.ReleaseAdminToken:output_type -> master_pb.ReleaseAdminTokenResponse
|
||||
51, // 78: master_pb.Seaweed.Ping:output_type -> master_pb.PingResponse
|
||||
57, // 79: master_pb.Seaweed.RaftListClusterServers:output_type -> master_pb.RaftListClusterServersResponse
|
||||
53, // 80: master_pb.Seaweed.RaftAddServer:output_type -> master_pb.RaftAddServerResponse
|
||||
55, // 81: master_pb.Seaweed.RaftRemoveServer:output_type -> master_pb.RaftRemoveServerResponse
|
||||
59, // 82: master_pb.Seaweed.RaftLeadershipTransfer:output_type -> master_pb.RaftLeadershipTransferResponse
|
||||
60, // 83: master_pb.Seaweed.VolumeGrow:output_type -> master_pb.VolumeGrowResponse
|
||||
60, // [60:84] is the sub-list for method output_type
|
||||
36, // [36:60] is the sub-list for method input_type
|
||||
36, // [36:36] is the sub-list for extension type_name
|
||||
36, // [36:36] is the sub-list for extension extendee
|
||||
0, // [0:36] is the sub-list for field type_name
|
||||
@@ -4621,7 +4737,7 @@ func file_master_proto_init() {
|
||||
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
|
||||
RawDescriptor: unsafe.Slice(unsafe.StringData(file_master_proto_rawDesc), len(file_master_proto_rawDesc)),
|
||||
NumEnums: 0,
|
||||
NumMessages: 70,
|
||||
NumMessages: 72,
|
||||
NumExtensions: 0,
|
||||
NumServices: 1,
|
||||
},
|
||||
|
||||
@@ -41,6 +41,7 @@ const (
|
||||
Seaweed_RaftListClusterServers_FullMethodName = "/master_pb.Seaweed/RaftListClusterServers"
|
||||
Seaweed_RaftAddServer_FullMethodName = "/master_pb.Seaweed/RaftAddServer"
|
||||
Seaweed_RaftRemoveServer_FullMethodName = "/master_pb.Seaweed/RaftRemoveServer"
|
||||
Seaweed_RaftLeadershipTransfer_FullMethodName = "/master_pb.Seaweed/RaftLeadershipTransfer"
|
||||
Seaweed_VolumeGrow_FullMethodName = "/master_pb.Seaweed/VolumeGrow"
|
||||
)
|
||||
|
||||
@@ -70,6 +71,7 @@ type SeaweedClient interface {
|
||||
RaftListClusterServers(ctx context.Context, in *RaftListClusterServersRequest, opts ...grpc.CallOption) (*RaftListClusterServersResponse, error)
|
||||
RaftAddServer(ctx context.Context, in *RaftAddServerRequest, opts ...grpc.CallOption) (*RaftAddServerResponse, error)
|
||||
RaftRemoveServer(ctx context.Context, in *RaftRemoveServerRequest, opts ...grpc.CallOption) (*RaftRemoveServerResponse, error)
|
||||
RaftLeadershipTransfer(ctx context.Context, in *RaftLeadershipTransferRequest, opts ...grpc.CallOption) (*RaftLeadershipTransferResponse, error)
|
||||
VolumeGrow(ctx context.Context, in *VolumeGrowRequest, opts ...grpc.CallOption) (*VolumeGrowResponse, error)
|
||||
}
|
||||
|
||||
@@ -310,6 +312,16 @@ func (c *seaweedClient) RaftRemoveServer(ctx context.Context, in *RaftRemoveServ
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *seaweedClient) RaftLeadershipTransfer(ctx context.Context, in *RaftLeadershipTransferRequest, opts ...grpc.CallOption) (*RaftLeadershipTransferResponse, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(RaftLeadershipTransferResponse)
|
||||
err := c.cc.Invoke(ctx, Seaweed_RaftLeadershipTransfer_FullMethodName, in, out, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *seaweedClient) VolumeGrow(ctx context.Context, in *VolumeGrowRequest, opts ...grpc.CallOption) (*VolumeGrowResponse, error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
out := new(VolumeGrowResponse)
|
||||
@@ -346,6 +358,7 @@ type SeaweedServer interface {
|
||||
RaftListClusterServers(context.Context, *RaftListClusterServersRequest) (*RaftListClusterServersResponse, error)
|
||||
RaftAddServer(context.Context, *RaftAddServerRequest) (*RaftAddServerResponse, error)
|
||||
RaftRemoveServer(context.Context, *RaftRemoveServerRequest) (*RaftRemoveServerResponse, error)
|
||||
RaftLeadershipTransfer(context.Context, *RaftLeadershipTransferRequest) (*RaftLeadershipTransferResponse, error)
|
||||
VolumeGrow(context.Context, *VolumeGrowRequest) (*VolumeGrowResponse, error)
|
||||
mustEmbedUnimplementedSeaweedServer()
|
||||
}
|
||||
@@ -423,6 +436,9 @@ func (UnimplementedSeaweedServer) RaftAddServer(context.Context, *RaftAddServerR
|
||||
func (UnimplementedSeaweedServer) RaftRemoveServer(context.Context, *RaftRemoveServerRequest) (*RaftRemoveServerResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method RaftRemoveServer not implemented")
|
||||
}
|
||||
func (UnimplementedSeaweedServer) RaftLeadershipTransfer(context.Context, *RaftLeadershipTransferRequest) (*RaftLeadershipTransferResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method RaftLeadershipTransfer not implemented")
|
||||
}
|
||||
func (UnimplementedSeaweedServer) VolumeGrow(context.Context, *VolumeGrowRequest) (*VolumeGrowResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method VolumeGrow not implemented")
|
||||
}
|
||||
@@ -810,6 +826,24 @@ func _Seaweed_RaftRemoveServer_Handler(srv interface{}, ctx context.Context, dec
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _Seaweed_RaftLeadershipTransfer_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(RaftLeadershipTransferRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(SeaweedServer).RaftLeadershipTransfer(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: Seaweed_RaftLeadershipTransfer_FullMethodName,
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(SeaweedServer).RaftLeadershipTransfer(ctx, req.(*RaftLeadershipTransferRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _Seaweed_VolumeGrow_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(VolumeGrowRequest)
|
||||
if err := dec(in); err != nil {
|
||||
@@ -911,6 +945,10 @@ var Seaweed_ServiceDesc = grpc.ServiceDesc{
|
||||
MethodName: "RaftRemoveServer",
|
||||
Handler: _Seaweed_RaftRemoveServer_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "RaftLeadershipTransfer",
|
||||
Handler: _Seaweed_RaftLeadershipTransfer_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "VolumeGrow",
|
||||
Handler: _Seaweed_VolumeGrow_Handler,
|
||||
|
||||
@@ -90,3 +90,50 @@ func (ms *MasterServer) RaftRemoveServer(ctx context.Context, req *master_pb.Raf
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (ms *MasterServer) RaftLeadershipTransfer(ctx context.Context, req *master_pb.RaftLeadershipTransferRequest) (*master_pb.RaftLeadershipTransferResponse, error) {
|
||||
resp := &master_pb.RaftLeadershipTransferResponse{}
|
||||
|
||||
ms.Topo.RaftServerAccessLock.RLock()
|
||||
defer ms.Topo.RaftServerAccessLock.RUnlock()
|
||||
|
||||
// Leadership transfer is only supported with hashicorp raft (-raftHashicorp=true)
|
||||
// The default seaweedfs/raft (goraft) implementation does not support this feature
|
||||
if ms.Topo.HashicorpRaft == nil {
|
||||
if ms.Topo.RaftServer != nil {
|
||||
return nil, fmt.Errorf("leadership transfer requires -raftHashicorp=true; the default raft implementation does not support this feature")
|
||||
}
|
||||
return nil, fmt.Errorf("raft not initialized (single master mode)")
|
||||
}
|
||||
|
||||
if ms.Topo.HashicorpRaft.State() != raft.Leader {
|
||||
leaderAddr, _ := ms.Topo.HashicorpRaft.LeaderWithID()
|
||||
return nil, fmt.Errorf("this server is not the leader; current leader is %s", leaderAddr)
|
||||
}
|
||||
|
||||
// Record previous leader
|
||||
_, previousLeaderId := ms.Topo.HashicorpRaft.LeaderWithID()
|
||||
resp.PreviousLeader = string(previousLeaderId)
|
||||
|
||||
var future raft.Future
|
||||
if req.TargetId != "" && req.TargetAddress != "" {
|
||||
// Transfer to specific server
|
||||
future = ms.Topo.HashicorpRaft.LeadershipTransferToServer(
|
||||
raft.ServerID(req.TargetId),
|
||||
raft.ServerAddress(req.TargetAddress),
|
||||
)
|
||||
} else {
|
||||
// Transfer to any eligible follower
|
||||
future = ms.Topo.HashicorpRaft.LeadershipTransfer()
|
||||
}
|
||||
|
||||
if err := future.Error(); err != nil {
|
||||
return nil, fmt.Errorf("leadership transfer failed: %v", err)
|
||||
}
|
||||
|
||||
// Get new leader info
|
||||
_, newLeaderId := ms.Topo.HashicorpRaft.LeaderWithID()
|
||||
resp.NewLeader = string(newLeaderId)
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
109
weed/server/master_grpc_server_raft_test.go
Normal file
109
weed/server/master_grpc_server_raft_test.go
Normal file
@@ -0,0 +1,109 @@
|
||||
package weed_server
|
||||
|
||||
// These tests cover the Raft gRPC handlers in scenarios where Raft is not initialized
|
||||
// (single master mode). Testing with an initialized Raft cluster requires integration
|
||||
// tests with a multi-master setup, as hashicorp/raft uses concrete types that cannot
|
||||
// be easily mocked.
|
||||
//
|
||||
// Integration tests for RaftLeadershipTransfer should cover:
|
||||
// - Successful leadership transfer to any follower (auto-selection)
|
||||
// - Successful leadership transfer to a specific target server
|
||||
// - Error when caller is not the current leader
|
||||
// - Error when target server is not a voting member
|
||||
// - Error when target server is unreachable
|
||||
//
|
||||
// These scenarios are best tested with test/multi_master/ integration tests
|
||||
// using a real 3-node master cluster with -raftHashicorp=true.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/topology"
|
||||
)
|
||||
|
||||
func TestRaftLeadershipTransfer_NoRaft(t *testing.T) {
|
||||
// Test case: raft not initialized (single master mode)
|
||||
ms := &MasterServer{
|
||||
Topo: topology.NewTopology("test", nil, 0, 0, false),
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
req := &master_pb.RaftLeadershipTransferRequest{}
|
||||
|
||||
_, err := ms.RaftLeadershipTransfer(ctx, req)
|
||||
if err == nil {
|
||||
t.Error("expected error when raft is not initialized")
|
||||
}
|
||||
|
||||
expectedMsg := "single master mode"
|
||||
if err != nil && !strings.Contains(err.Error(), expectedMsg) {
|
||||
t.Errorf("expected error message to contain %q, got %q", expectedMsg, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
func TestRaftListClusterServers_NoRaft(t *testing.T) {
|
||||
// Test case: raft not initialized returns empty response
|
||||
ms := &MasterServer{
|
||||
Topo: topology.NewTopology("test", nil, 0, 0, false),
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
req := &master_pb.RaftListClusterServersRequest{}
|
||||
|
||||
resp, err := ms.RaftListClusterServers(ctx, req)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if resp == nil {
|
||||
t.Error("expected non-nil response")
|
||||
}
|
||||
if len(resp.ClusterServers) != 0 {
|
||||
t.Errorf("expected empty cluster servers, got %d", len(resp.ClusterServers))
|
||||
}
|
||||
}
|
||||
|
||||
func TestRaftAddServer_NoRaft(t *testing.T) {
|
||||
// Test case: raft not initialized returns empty response
|
||||
ms := &MasterServer{
|
||||
Topo: topology.NewTopology("test", nil, 0, 0, false),
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
req := &master_pb.RaftAddServerRequest{
|
||||
Id: "test-server",
|
||||
Address: "localhost:19333",
|
||||
Voter: true,
|
||||
}
|
||||
|
||||
resp, err := ms.RaftAddServer(ctx, req)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if resp == nil {
|
||||
t.Error("expected non-nil response")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRaftRemoveServer_NoRaft(t *testing.T) {
|
||||
// Test case: raft not initialized returns empty response
|
||||
ms := &MasterServer{
|
||||
Topo: topology.NewTopology("test", nil, 0, 0, false),
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
req := &master_pb.RaftRemoveServerRequest{
|
||||
Id: "test-server",
|
||||
Force: true,
|
||||
}
|
||||
|
||||
resp, err := ms.RaftRemoveServer(ctx, req)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if resp == nil {
|
||||
t.Error("expected non-nil response")
|
||||
}
|
||||
}
|
||||
144
weed/shell/command_cluster_raft_leader_transfer.go
Normal file
144
weed/shell/command_cluster_raft_leader_transfer.go
Normal file
@@ -0,0 +1,144 @@
|
||||
package shell
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||
)
|
||||
|
||||
func init() {
|
||||
Commands = append(Commands, &commandRaftLeaderTransfer{})
|
||||
}
|
||||
|
||||
type commandRaftLeaderTransfer struct{}
|
||||
|
||||
func (c *commandRaftLeaderTransfer) Name() string {
|
||||
return "cluster.raft.transferLeader"
|
||||
}
|
||||
|
||||
func (c *commandRaftLeaderTransfer) Help() string {
|
||||
return `transfer raft leadership to another master server
|
||||
|
||||
This command initiates a graceful leadership transfer from the current
|
||||
leader to another server. Use this before performing maintenance on
|
||||
the current leader to reduce errors in filers and other components.
|
||||
|
||||
Examples:
|
||||
# Transfer to any eligible follower (auto-selection)
|
||||
cluster.raft.transferLeader
|
||||
|
||||
# Transfer to a specific server
|
||||
cluster.raft.transferLeader -id <server_id> -address <server_grpc_address>
|
||||
|
||||
Notes:
|
||||
- Requires hashicorp raft (-raftHashicorp=true on master)
|
||||
- This command must be sent to the current leader
|
||||
- The target server must be a voting member of the raft cluster
|
||||
- Use 'cluster.raft.ps' to list available servers and identify the leader
|
||||
`
|
||||
}
|
||||
|
||||
func (c *commandRaftLeaderTransfer) HasTag(CommandTag) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *commandRaftLeaderTransfer) Do(args []string, commandEnv *CommandEnv, writer io.Writer) error {
|
||||
leaderTransferCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
||||
targetId := leaderTransferCommand.String("id", "", "target server id (must be used with -address)")
|
||||
targetAddress := leaderTransferCommand.String("address", "", "target server grpc address (must be used with -id)")
|
||||
|
||||
if err := leaderTransferCommand.Parse(args); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Validate: id and address must be specified together
|
||||
if *targetId != "" && *targetAddress == "" {
|
||||
return fmt.Errorf("-address is required when -id is specified")
|
||||
}
|
||||
if *targetAddress != "" && *targetId == "" {
|
||||
return fmt.Errorf("-id is required when -address is specified")
|
||||
}
|
||||
|
||||
// First, show current cluster status
|
||||
fmt.Fprintf(writer, "Checking current raft cluster status...\n")
|
||||
|
||||
var currentLeader string
|
||||
err := commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
resp, err := client.RaftListClusterServers(ctx, &master_pb.RaftListClusterServersRequest{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to list cluster servers: %v", err)
|
||||
}
|
||||
|
||||
if len(resp.ClusterServers) == 0 {
|
||||
fmt.Fprintf(writer, "No raft cluster configured (single master mode)\n")
|
||||
return fmt.Errorf("leadership transfer not available in single master mode")
|
||||
}
|
||||
|
||||
fmt.Fprintf(writer, "Raft cluster has %d servers:\n", len(resp.ClusterServers))
|
||||
for _, server := range resp.ClusterServers {
|
||||
suffix := ""
|
||||
if server.IsLeader {
|
||||
suffix = " <- current leader"
|
||||
currentLeader = server.Id
|
||||
}
|
||||
fmt.Fprintf(writer, " %s %s [%s]%s\n", server.Id, server.Address, server.Suffrage, suffix)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if currentLeader == "" {
|
||||
return fmt.Errorf("no leader found in cluster")
|
||||
}
|
||||
|
||||
// Perform the transfer
|
||||
targetDesc := "any eligible follower"
|
||||
if *targetId != "" {
|
||||
targetDesc = fmt.Sprintf("server %s (%s)", *targetId, *targetAddress)
|
||||
}
|
||||
fmt.Fprintf(writer, "\nTransferring leadership from %s to %s...\n", currentLeader, targetDesc)
|
||||
|
||||
err = commandEnv.MasterClient.WithClient(true, func(client master_pb.SeaweedClient) error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
resp, err := client.RaftLeadershipTransfer(ctx, &master_pb.RaftLeadershipTransferRequest{
|
||||
TargetId: *targetId,
|
||||
TargetAddress: *targetAddress,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("leadership transfer failed: %v", err)
|
||||
}
|
||||
|
||||
if resp.PreviousLeader != resp.NewLeader {
|
||||
fmt.Fprintf(writer, "Leadership successfully transferred.\n")
|
||||
fmt.Fprintf(writer, " Previous leader: %s\n", resp.PreviousLeader)
|
||||
fmt.Fprintf(writer, " New leader: %s\n", resp.NewLeader)
|
||||
} else {
|
||||
fmt.Fprintf(writer, "Leadership transfer initiated, but the same leader was re-elected.\n")
|
||||
fmt.Fprintf(writer, " Current leader: %s\n", resp.NewLeader)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
fmt.Fprintf(writer, "\nLeadership transfer failed: %v\n", err)
|
||||
fmt.Fprintf(writer, "\nTroubleshooting:\n")
|
||||
fmt.Fprintf(writer, " - Ensure you are connected to the current leader\n")
|
||||
fmt.Fprintf(writer, " - Ensure target server is a voting member (use 'cluster.raft.ps')\n")
|
||||
fmt.Fprintf(writer, " - Ensure target server is healthy and reachable\n")
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
89
weed/shell/command_cluster_raft_leader_transfer_test.go
Normal file
89
weed/shell/command_cluster_raft_leader_transfer_test.go
Normal file
@@ -0,0 +1,89 @@
|
||||
package shell
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestRaftLeaderTransfer_Name(t *testing.T) {
|
||||
cmd := &commandRaftLeaderTransfer{}
|
||||
expected := "cluster.raft.transferLeader"
|
||||
if cmd.Name() != expected {
|
||||
t.Errorf("expected name %q, got %q", expected, cmd.Name())
|
||||
}
|
||||
}
|
||||
|
||||
func TestRaftLeaderTransfer_Help(t *testing.T) {
|
||||
cmd := &commandRaftLeaderTransfer{}
|
||||
help := cmd.Help()
|
||||
|
||||
// Verify help text contains key information
|
||||
expectedPhrases := []string{
|
||||
"transfer raft leadership",
|
||||
"cluster.raft.transferLeader",
|
||||
"-id",
|
||||
"-address",
|
||||
"cluster.raft.ps",
|
||||
"-raftHashicorp",
|
||||
}
|
||||
|
||||
for _, phrase := range expectedPhrases {
|
||||
if !strings.Contains(help, phrase) {
|
||||
t.Errorf("help text should contain %q", phrase)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRaftLeaderTransfer_HasTag(t *testing.T) {
|
||||
cmd := &commandRaftLeaderTransfer{}
|
||||
// The command should not have any special tags
|
||||
if cmd.HasTag(ResourceHeavy) {
|
||||
t.Error("expected HasTag to return false for ResourceHeavy")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRaftLeaderTransfer_ValidateTargetIdWithoutAddress(t *testing.T) {
|
||||
cmd := &commandRaftLeaderTransfer{}
|
||||
var buf bytes.Buffer
|
||||
|
||||
// Create a mock command environment - this will fail because no master client
|
||||
// but we can verify argument parsing
|
||||
err := cmd.Do([]string{"-id", "test-server"}, nil, &buf)
|
||||
|
||||
// Should fail because -address is required when -id is specified
|
||||
if err == nil {
|
||||
t.Error("expected error when -id is specified without -address")
|
||||
}
|
||||
if err != nil && !strings.Contains(err.Error(), "-address is required") {
|
||||
t.Errorf("expected error about missing -address, got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRaftLeaderTransfer_ValidateTargetAddressWithoutId(t *testing.T) {
|
||||
cmd := &commandRaftLeaderTransfer{}
|
||||
var buf bytes.Buffer
|
||||
|
||||
// Verify argument parsing - address without id should fail
|
||||
err := cmd.Do([]string{"-address", "localhost:19333"}, nil, &buf)
|
||||
|
||||
// Should fail because -id is required when -address is specified
|
||||
if err == nil {
|
||||
t.Error("expected error when -address is specified without -id")
|
||||
}
|
||||
if err != nil && !strings.Contains(err.Error(), "-id is required") {
|
||||
t.Errorf("expected error about missing -id, got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRaftLeaderTransfer_UnknownFlag(t *testing.T) {
|
||||
cmd := &commandRaftLeaderTransfer{}
|
||||
var buf bytes.Buffer
|
||||
|
||||
// Unknown flag should return an error
|
||||
err := cmd.Do([]string{"-unknown-flag"}, nil, &buf)
|
||||
if err == nil {
|
||||
t.Error("expected error for unknown flag")
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user