Prevent split-brain: Persistent ClusterID and Join Validation (#8022)
* Prevent split-brain: Persistent ClusterID and Join Validation - Persist ClusterId in Raft store to survive restarts. - Validate ClusterId on Raft command application (piggybacked on MaxVolumeId). - Prevent masters with conflicting ClusterIds from joining/operating together. - Update Telemetry to report the persistent ClusterId. * Refine ClusterID validation based on feedback - Improved error message in cluster_commands.go. - Added ClusterId mismatch check in RaftServer.Recovery. * Handle Raft errors and support Hashicorp Raft for ClusterId - Check for errors when persisting ClusterId in legacy Raft. - Implement ClusterId generation and persistence for Hashicorp Raft leader changes. - Ensure consistent error logging. * Refactor ClusterId validation - Centralize ClusterId mismatch check in Topology.SetClusterId. - Simplify MaxVolumeIdCommand.Apply and RaftServer.Recovery to rely on SetClusterId. * Fix goroutine leak and add timeout - Handle channel closure in Hashicorp Raft leader listener. - Add timeout to Raft Apply call to prevent blocking. * Fix deadlock in legacy Raft listener - Wrap ClusterId generation/persistence in a goroutine to avoid blocking the Raft event loop (deadlock). * Rename ClusterId to SystemId - Renamed ClusterId to SystemId across the codebase (protobuf, topology, server, telemetry). - Regenerated telemetry.pb.go with new field. * Rename SystemId to TopologyId - Rename to SystemId was intermediate step. - Final name is TopologyId for the persistent cluster identifier. - Updated protobuf, topology, raft server, master server, and telemetry. * Optimize Hashicorp Raft listener - Integrated TopologyId generation into existing monitorLeaderLoop. - Removed extra goroutine in master_server.go. * Fix optimistic TopologyId update - Removed premature local state update of TopologyId in master_server.go and raft_hashicorp.go. - State is now solely updated via the Raft state machine Apply/Restore methods after consensus. * Add explicit log for recovered TopologyId - Added glog.V(0) info log in RaftServer.Recovery to print the recovered TopologyId on startup. * Add Raft barrier to prevent TopologyId race condition - Implement ensureTopologyId helper method - Send no-op MaxVolumeIdCommand to sync Raft log before checking TopologyId - Ensures persisted TopologyId is recovered before generating new one - Prevents race where generation happens during log replay * Serialize TopologyId generation with mutex - Add topologyIdGenLock mutex to MasterServer struct - Wrap ensureTopologyId method with lock to prevent concurrent generation - Fixes race where event listener and manual leadership check both generate IDs - Second caller waits for first to complete and sees the generated ID * Add TopologyId recovery logging to Apply method - Change log level from V(1) to V(0) for visibility - Log 'Recovered TopologyId' when applying from Raft log - Ensures recovery is visible whether from snapshot or log replay - Matches Recovery() method logging for consistency * Fix Raft barrier timing issue - Add 100ms delay after barrier command to ensure log application completes - Add debug logging to track barrier execution and TopologyId state - Return early if barrier command fails - Prevents TopologyId generation before old logs are fully applied * ensure leader * address comments * address comments * redundant * clean up * double check * refactoring * comment
This commit is contained in:
@@ -12,11 +12,13 @@ import (
|
||||
|
||||
type MaxVolumeIdCommand struct {
|
||||
MaxVolumeId needle.VolumeId `json:"maxVolumeId"`
|
||||
TopologyId string `json:"topologyId"`
|
||||
}
|
||||
|
||||
func NewMaxVolumeIdCommand(value needle.VolumeId) *MaxVolumeIdCommand {
|
||||
func NewMaxVolumeIdCommand(value needle.VolumeId, topologyId string) *MaxVolumeIdCommand {
|
||||
return &MaxVolumeIdCommand{
|
||||
MaxVolumeId: value,
|
||||
TopologyId: topologyId,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,7 +31,18 @@ func (c *MaxVolumeIdCommand) Apply(server raft.Server) (interface{}, error) {
|
||||
topo := server.Context().(*Topology)
|
||||
before := topo.GetMaxVolumeId()
|
||||
topo.UpAdjustMaxVolumeId(c.MaxVolumeId)
|
||||
|
||||
if c.TopologyId != "" {
|
||||
prevTopologyId := topo.GetTopologyId()
|
||||
topo.SetTopologyId(c.TopologyId)
|
||||
// Log when TopologyId is set for the first time, with different messages for leader and follower.
|
||||
if prevTopologyId == "" {
|
||||
if server.State() == raft.Leader {
|
||||
glog.V(0).Infof("TopologyId generated and applied on leader: %s", c.TopologyId)
|
||||
} else {
|
||||
glog.V(0).Infof("TopologyId applied on follower: %s", c.TopologyId)
|
||||
}
|
||||
}
|
||||
}
|
||||
glog.V(1).Infoln("max volume id", before, "==>", topo.GetMaxVolumeId())
|
||||
|
||||
return nil, nil
|
||||
|
||||
@@ -57,6 +57,9 @@ type Topology struct {
|
||||
UuidAccessLock sync.RWMutex
|
||||
UuidMap map[string][]string
|
||||
|
||||
topologyId string
|
||||
topologyIdLock sync.RWMutex
|
||||
|
||||
LastLeaderChangeTime time.Time
|
||||
}
|
||||
|
||||
@@ -234,11 +237,11 @@ func (t *Topology) NextVolumeId() (needle.VolumeId, error) {
|
||||
defer t.RaftServerAccessLock.RUnlock()
|
||||
|
||||
if t.RaftServer != nil {
|
||||
if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil {
|
||||
if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next, t.GetTopologyId())); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
} else if t.HashicorpRaft != nil {
|
||||
b, err := json.Marshal(NewMaxVolumeIdCommand(next))
|
||||
b, err := json.Marshal(NewMaxVolumeIdCommand(next, t.GetTopologyId()))
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed marshal NewMaxVolumeIdCommand: %+v", err)
|
||||
}
|
||||
@@ -468,3 +471,24 @@ func (t *Topology) EnableVacuum() {
|
||||
glog.V(0).Infof("EnableVacuum")
|
||||
t.isDisableVacuum = false
|
||||
}
|
||||
|
||||
func (t *Topology) GetTopologyId() string {
|
||||
t.topologyIdLock.RLock()
|
||||
defer t.topologyIdLock.RUnlock()
|
||||
return t.topologyId
|
||||
}
|
||||
|
||||
func (t *Topology) SetTopologyId(topologyId string) {
|
||||
t.topologyIdLock.Lock()
|
||||
defer t.topologyIdLock.Unlock()
|
||||
if topologyId == "" {
|
||||
return
|
||||
}
|
||||
if t.topologyId == "" {
|
||||
t.topologyId = topologyId
|
||||
return
|
||||
}
|
||||
if t.topologyId != topologyId {
|
||||
glog.Fatalf("Split-brain detected! Current TopologyId is %s, but received %s. Stopping to prevent data corruption.", t.topologyId, topologyId)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user