fix balance fallback replica placement (#8824)
This commit is contained in:
@@ -363,6 +363,23 @@ func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetric
|
|||||||
return nil, ""
|
return nil, ""
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Parse replica placement once for use in both scoring and validation.
|
||||||
|
var volumeRP *super_block.ReplicaPlacement
|
||||||
|
if selectedVolume.ExpectedReplicas > 0 && selectedVolume.ExpectedReplicas <= 255 {
|
||||||
|
if parsed, rpErr := super_block.NewReplicaPlacementFromByte(byte(selectedVolume.ExpectedReplicas)); rpErr == nil && parsed.HasReplication() {
|
||||||
|
volumeRP = parsed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var replicas []types.ReplicaLocation
|
||||||
|
if clusterInfo.VolumeReplicaMap != nil {
|
||||||
|
replicas = clusterInfo.VolumeReplicaMap[selectedVolume.VolumeID]
|
||||||
|
if volumeRP != nil && len(replicas) == 0 {
|
||||||
|
glog.V(1).Infof("BALANCE [%s]: No replica locations found for volume %d, skipping placement validation",
|
||||||
|
diskType, selectedVolume.VolumeID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Resolve the target server chosen by the detection loop's effective counts.
|
// Resolve the target server chosen by the detection loop's effective counts.
|
||||||
// This keeps destination selection in sync with the greedy algorithm rather
|
// This keeps destination selection in sync with the greedy algorithm rather
|
||||||
// than relying on topology LoadCount which can diverge across iterations.
|
// than relying on topology LoadCount which can diverge across iterations.
|
||||||
@@ -371,62 +388,43 @@ func createBalanceTask(diskType string, selectedVolume *types.VolumeHealthMetric
|
|||||||
// Fall back to score-based planning if the preferred target can't be resolved
|
// Fall back to score-based planning if the preferred target can't be resolved
|
||||||
glog.V(1).Infof("BALANCE [%s]: Cannot resolve target %s for volume %d, falling back to score-based planning: %v",
|
glog.V(1).Infof("BALANCE [%s]: Cannot resolve target %s for volume %d, falling back to score-based planning: %v",
|
||||||
diskType, targetServer, selectedVolume.VolumeID, err)
|
diskType, targetServer, selectedVolume.VolumeID, err)
|
||||||
destinationPlan, err = planBalanceDestination(clusterInfo.ActiveTopology, selectedVolume)
|
destinationPlan, err = planBalanceDestination(clusterInfo.ActiveTopology, selectedVolume, volumeRP, replicas, allowedServers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Warningf("Failed to plan balance destination for volume %d: %v", selectedVolume.VolumeID, err)
|
glog.Warningf("Failed to plan balance destination for volume %d: %v", selectedVolume.VolumeID, err)
|
||||||
return nil, ""
|
return nil, ""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify the destination is within the filtered scope. When DC/rack/node
|
// Verify the resolved destination. If it falls outside the filtered scope
|
||||||
// filters are active, allowedServers contains only the servers that passed
|
// or violates replica placement, fall back to the score-based planner and
|
||||||
// filtering. The fallback planner queries the full topology, so this check
|
// pick the best candidate that is actually valid.
|
||||||
// prevents out-of-scope targets from leaking through.
|
if !isValidBalanceDestination(destinationPlan, allowedServers, volumeRP, replicas, selectedVolume.Server) {
|
||||||
if len(allowedServers) > 0 {
|
switch {
|
||||||
if _, ok := allowedServers[destinationPlan.TargetNode]; !ok {
|
case destinationPlan == nil:
|
||||||
glog.V(1).Infof("BALANCE [%s]: Planned destination %s for volume %d is outside filtered scope, skipping",
|
glog.V(1).Infof("BALANCE [%s]: Planned destination for volume %d is nil, falling back",
|
||||||
|
diskType, selectedVolume.VolumeID)
|
||||||
|
case !isAllowedBalanceTarget(destinationPlan.TargetNode, allowedServers):
|
||||||
|
glog.V(1).Infof("BALANCE [%s]: Planned destination %s for volume %d is outside filtered scope, falling back",
|
||||||
diskType, destinationPlan.TargetNode, selectedVolume.VolumeID)
|
diskType, destinationPlan.TargetNode, selectedVolume.VolumeID)
|
||||||
|
case volumeRP != nil && len(replicas) > 0:
|
||||||
|
glog.V(1).Infof("BALANCE [%s]: Destination %s violates replica placement for volume %d (rp=%03d), falling back",
|
||||||
|
diskType, destinationPlan.TargetNode, selectedVolume.VolumeID, selectedVolume.ExpectedReplicas)
|
||||||
|
}
|
||||||
|
|
||||||
|
destinationPlan, err = planBalanceDestination(clusterInfo.ActiveTopology, selectedVolume, volumeRP, replicas, allowedServers)
|
||||||
|
if err != nil {
|
||||||
|
glog.Warningf("BALANCE [%s]: Failed to plan fallback destination for volume %d: %v", diskType, selectedVolume.VolumeID, err)
|
||||||
return nil, ""
|
return nil, ""
|
||||||
}
|
}
|
||||||
}
|
if !isValidBalanceDestination(destinationPlan, allowedServers, volumeRP, replicas, selectedVolume.Server) {
|
||||||
|
if destinationPlan == nil {
|
||||||
// Validate move against replica placement policy
|
glog.V(1).Infof("BALANCE [%s]: Fallback destination for volume %d is nil",
|
||||||
if selectedVolume.ExpectedReplicas > 0 && selectedVolume.ExpectedReplicas <= 255 && clusterInfo.VolumeReplicaMap != nil {
|
|
||||||
rpBytes, rpErr := super_block.NewReplicaPlacementFromByte(byte(selectedVolume.ExpectedReplicas))
|
|
||||||
if rpErr == nil && rpBytes.HasReplication() {
|
|
||||||
replicas := clusterInfo.VolumeReplicaMap[selectedVolume.VolumeID]
|
|
||||||
if len(replicas) == 0 {
|
|
||||||
glog.V(1).Infof("BALANCE [%s]: No replica locations found for volume %d, skipping placement validation",
|
|
||||||
diskType, selectedVolume.VolumeID)
|
diskType, selectedVolume.VolumeID)
|
||||||
} else {
|
} else {
|
||||||
validateMove := func(plan *topology.DestinationPlan) bool {
|
glog.V(1).Infof("BALANCE [%s]: Fallback destination %s is not valid for volume %d",
|
||||||
if plan == nil {
|
diskType, destinationPlan.TargetNode, selectedVolume.VolumeID)
|
||||||
return false
|
|
||||||
}
|
|
||||||
target := types.ReplicaLocation{
|
|
||||||
DataCenter: plan.TargetDC,
|
|
||||||
Rack: plan.TargetRack,
|
|
||||||
NodeID: plan.TargetNode,
|
|
||||||
}
|
|
||||||
return IsGoodMove(rpBytes, replicas, selectedVolume.Server, target)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !validateMove(destinationPlan) {
|
|
||||||
glog.V(1).Infof("BALANCE [%s]: Destination %s violates replica placement for volume %d (rp=%03d), falling back",
|
|
||||||
diskType, destinationPlan.TargetNode, selectedVolume.VolumeID, selectedVolume.ExpectedReplicas)
|
|
||||||
// Fall back to score-based planning
|
|
||||||
destinationPlan, err = planBalanceDestination(clusterInfo.ActiveTopology, selectedVolume)
|
|
||||||
if err != nil {
|
|
||||||
glog.Warningf("BALANCE [%s]: Failed to plan fallback destination for volume %d: %v", diskType, selectedVolume.VolumeID, err)
|
|
||||||
return nil, ""
|
|
||||||
}
|
|
||||||
if !validateMove(destinationPlan) {
|
|
||||||
glog.V(1).Infof("BALANCE [%s]: Fallback destination %s also violates replica placement for volume %d",
|
|
||||||
diskType, destinationPlan.TargetNode, selectedVolume.VolumeID)
|
|
||||||
return nil, ""
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
return nil, ""
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -555,7 +553,10 @@ func resolveBalanceDestination(activeTopology *topology.ActiveTopology, selected
|
|||||||
// planBalanceDestination plans the destination for a balance operation using
|
// planBalanceDestination plans the destination for a balance operation using
|
||||||
// score-based selection. Used as a fallback when the preferred target cannot
|
// score-based selection. Used as a fallback when the preferred target cannot
|
||||||
// be resolved, and for single-move scenarios outside the detection loop.
|
// be resolved, and for single-move scenarios outside the detection loop.
|
||||||
func planBalanceDestination(activeTopology *topology.ActiveTopology, selectedVolume *types.VolumeHealthMetrics) (*topology.DestinationPlan, error) {
|
// rp may be nil when the volume has no replication constraint. When replica
|
||||||
|
// locations are known, candidates that would violate placement are filtered
|
||||||
|
// out before scoring.
|
||||||
|
func planBalanceDestination(activeTopology *topology.ActiveTopology, selectedVolume *types.VolumeHealthMetrics, rp *super_block.ReplicaPlacement, replicas []types.ReplicaLocation, allowedServers map[string]int) (*topology.DestinationPlan, error) {
|
||||||
// Get source node information from topology
|
// Get source node information from topology
|
||||||
var sourceRack, sourceDC string
|
var sourceRack, sourceDC string
|
||||||
|
|
||||||
@@ -604,8 +605,21 @@ func planBalanceDestination(activeTopology *topology.ActiveTopology, selectedVol
|
|||||||
if disk.DiskType != selectedVolume.DiskType {
|
if disk.DiskType != selectedVolume.DiskType {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if !isAllowedBalanceTarget(disk.NodeID, allowedServers) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if rp != nil && len(replicas) > 0 {
|
||||||
|
target := types.ReplicaLocation{
|
||||||
|
DataCenter: disk.DataCenter,
|
||||||
|
Rack: disk.Rack,
|
||||||
|
NodeID: disk.NodeID,
|
||||||
|
}
|
||||||
|
if !IsGoodMove(rp, replicas, selectedVolume.Server, target) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
score := calculateBalanceScore(disk, sourceRack, sourceDC, selectedVolume.Size)
|
score := calculateBalanceScore(disk, sourceRack, sourceDC, selectedVolume.Size, rp)
|
||||||
if score > bestScore {
|
if score > bestScore {
|
||||||
bestScore = score
|
bestScore = score
|
||||||
bestDisk = disk
|
bestDisk = disk
|
||||||
@@ -636,7 +650,9 @@ func planBalanceDestination(activeTopology *topology.ActiveTopology, selectedVol
|
|||||||
// calculateBalanceScore calculates placement score for balance operations.
|
// calculateBalanceScore calculates placement score for balance operations.
|
||||||
// LoadCount reflects pending+assigned tasks on the disk, so we factor it into
|
// LoadCount reflects pending+assigned tasks on the disk, so we factor it into
|
||||||
// the utilization estimate to avoid stacking multiple moves onto the same target.
|
// the utilization estimate to avoid stacking multiple moves onto the same target.
|
||||||
func calculateBalanceScore(disk *topology.DiskInfo, sourceRack, sourceDC string, volumeSize uint64) float64 {
|
// rp may be nil when the volume has no replication constraint; in that case the
|
||||||
|
// scorer defaults to preferring cross-rack/DC distribution.
|
||||||
|
func calculateBalanceScore(disk *topology.DiskInfo, sourceRack, sourceDC string, volumeSize uint64, rp *super_block.ReplicaPlacement) float64 {
|
||||||
if disk.DiskInfo == nil {
|
if disk.DiskInfo == nil {
|
||||||
return 0.0
|
return 0.0
|
||||||
}
|
}
|
||||||
@@ -652,17 +668,62 @@ func calculateBalanceScore(disk *topology.DiskInfo, sourceRack, sourceDC string,
|
|||||||
score += (1.0 - utilization) * 50.0 // Up to 50 points for low utilization
|
score += (1.0 - utilization) * 50.0 // Up to 50 points for low utilization
|
||||||
}
|
}
|
||||||
|
|
||||||
// Prefer different racks for better distribution
|
// Rack scoring: respect the replication policy.
|
||||||
if disk.Rack != sourceRack {
|
// If replicas must stay on the same rack (SameRackCount > 0 with no
|
||||||
score += 30.0
|
// cross-rack requirement), prefer same-rack destinations. Otherwise
|
||||||
|
// prefer different racks for better distribution.
|
||||||
|
sameRack := disk.Rack == sourceRack
|
||||||
|
if rp != nil && rp.DiffRackCount == 0 && rp.SameRackCount > 0 {
|
||||||
|
if sameRack {
|
||||||
|
score += 30.0
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if !sameRack {
|
||||||
|
score += 30.0
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Prefer different data centers for better distribution
|
// DC scoring: same idea. If the policy requires all copies in one DC,
|
||||||
if disk.DataCenter != sourceDC {
|
// prefer same-DC destinations. Otherwise prefer different DCs.
|
||||||
score += 20.0
|
sameDC := disk.DataCenter == sourceDC
|
||||||
|
if rp != nil && rp.DiffDataCenterCount == 0 && (rp.SameRackCount > 0 || rp.DiffRackCount > 0) {
|
||||||
|
if sameDC {
|
||||||
|
score += 20.0
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if !sameDC {
|
||||||
|
score += 20.0
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return score
|
return score
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isAllowedBalanceTarget(nodeID string, allowedServers map[string]int) bool {
|
||||||
|
if len(allowedServers) == 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
_, ok := allowedServers[nodeID]
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
|
func isValidBalanceDestination(plan *topology.DestinationPlan, allowedServers map[string]int, rp *super_block.ReplicaPlacement, replicas []types.ReplicaLocation, sourceNodeID string) bool {
|
||||||
|
if plan == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if !isAllowedBalanceTarget(plan.TargetNode, allowedServers) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if rp == nil || len(replicas) == 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
target := types.ReplicaLocation{
|
||||||
|
DataCenter: plan.TargetDC,
|
||||||
|
Rack: plan.TargetRack,
|
||||||
|
NodeID: plan.TargetNode,
|
||||||
|
}
|
||||||
|
return IsGoodMove(rp, replicas, sourceNodeID, target)
|
||||||
|
}
|
||||||
|
|
||||||
// parseCSVSet splits a comma-separated string into a set of trimmed, non-empty values.
|
// parseCSVSet splits a comma-separated string into a set of trimmed, non-empty values.
|
||||||
|
|||||||
@@ -3,6 +3,8 @@ package balance
|
|||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/admin/topology"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/worker/types"
|
"github.com/seaweedfs/seaweedfs/weed/worker/types"
|
||||||
)
|
)
|
||||||
@@ -125,3 +127,112 @@ func TestIsGoodMove_NilReplicaPlacement(t *testing.T) {
|
|||||||
t.Error("nil replica placement should allow any move")
|
t.Error("nil replica placement should allow any move")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCalculateBalanceScore_ReplicationAware(t *testing.T) {
|
||||||
|
disk := func(dc, rack string) *topology.DiskInfo {
|
||||||
|
return &topology.DiskInfo{
|
||||||
|
DataCenter: dc,
|
||||||
|
Rack: rack,
|
||||||
|
DiskInfo: &master_pb.DiskInfo{MaxVolumeCount: 100, VolumeCount: 50},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 001: same-rack replication — should prefer same rack and same DC
|
||||||
|
rp001 := rp(t, "001")
|
||||||
|
sameRack := calculateBalanceScore(disk("dc1", "r1"), "r1", "dc1", 0, rp001)
|
||||||
|
diffRack := calculateBalanceScore(disk("dc1", "r2"), "r1", "dc1", 0, rp001)
|
||||||
|
diffDC := calculateBalanceScore(disk("dc2", "r2"), "r1", "dc1", 0, rp001)
|
||||||
|
if sameRack <= diffRack {
|
||||||
|
t.Errorf("001: same-rack score (%v) should exceed different-rack score (%v)", sameRack, diffRack)
|
||||||
|
}
|
||||||
|
if sameRack <= diffDC {
|
||||||
|
t.Errorf("001: same-rack score (%v) should exceed different-DC score (%v)", sameRack, diffDC)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 010: different-rack replication — should prefer different rack, same DC
|
||||||
|
rp010 := rp(t, "010")
|
||||||
|
sameRack = calculateBalanceScore(disk("dc1", "r1"), "r1", "dc1", 0, rp010)
|
||||||
|
diffRack = calculateBalanceScore(disk("dc1", "r2"), "r1", "dc1", 0, rp010)
|
||||||
|
if diffRack <= sameRack {
|
||||||
|
t.Errorf("010: different-rack score (%v) should exceed same-rack score (%v)", diffRack, sameRack)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 100: different-DC replication — should prefer different DC
|
||||||
|
rp100 := rp(t, "100")
|
||||||
|
sameDC := calculateBalanceScore(disk("dc1", "r2"), "r1", "dc1", 0, rp100)
|
||||||
|
diffDCScore := calculateBalanceScore(disk("dc2", "r2"), "r1", "dc1", 0, rp100)
|
||||||
|
if diffDCScore <= sameDC {
|
||||||
|
t.Errorf("100: different-DC score (%v) should exceed same-DC score (%v)", diffDCScore, sameDC)
|
||||||
|
}
|
||||||
|
|
||||||
|
// nil rp: should prefer cross-rack/DC (default behavior)
|
||||||
|
sameRack = calculateBalanceScore(disk("dc1", "r1"), "r1", "dc1", 0, nil)
|
||||||
|
diffRack = calculateBalanceScore(disk("dc1", "r2"), "r1", "dc1", 0, nil)
|
||||||
|
if diffRack <= sameRack {
|
||||||
|
t.Errorf("nil rp: different-rack score (%v) should exceed same-rack score (%v)", diffRack, sameRack)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPlanBalanceDestination_ChoosesBestValidCompositeDestination(t *testing.T) {
|
||||||
|
servers := []serverSpec{
|
||||||
|
{id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"},
|
||||||
|
{id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack3"},
|
||||||
|
{id: "node-c", diskType: "hdd", diskID: 3, dc: "dc1", rack: "rack1"},
|
||||||
|
}
|
||||||
|
volumes := makeVolumesWith("node-a", "hdd", "dc1", "rack1", "c1", 1, 1, withReplicas(11))
|
||||||
|
replicas := []types.ReplicaLocation{
|
||||||
|
loc("dc1", "rack1", "node-a"),
|
||||||
|
loc("dc1", "rack1", "node-d"),
|
||||||
|
loc("dc1", "rack2", "node-e"),
|
||||||
|
}
|
||||||
|
|
||||||
|
plan, err := planBalanceDestination(buildTopology(servers, volumes), volumes[0], rp(t, "011"), replicas, map[string]int{
|
||||||
|
"node-b": 0,
|
||||||
|
"node-c": 0,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("planBalanceDestination failed: %v", err)
|
||||||
|
}
|
||||||
|
if plan.TargetNode != "node-c" {
|
||||||
|
t.Fatalf("expected valid same-rack destination node-c, got %s", plan.TargetNode)
|
||||||
|
}
|
||||||
|
if plan.TargetRack != "rack1" {
|
||||||
|
t.Fatalf("expected rack1 destination, got %s", plan.TargetRack)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCreateBalanceTask_FallbackSelectsValidCompositeDestination(t *testing.T) {
|
||||||
|
servers := []serverSpec{
|
||||||
|
{id: "node-a", diskType: "hdd", diskID: 1, dc: "dc1", rack: "rack1"},
|
||||||
|
{id: "node-b", diskType: "hdd", diskID: 2, dc: "dc1", rack: "rack3"},
|
||||||
|
{id: "node-c", diskType: "hdd", diskID: 3, dc: "dc1", rack: "rack1"},
|
||||||
|
}
|
||||||
|
volumes := makeVolumesWith("node-a", "hdd", "dc1", "rack1", "c1", 1, 1, withReplicas(11))
|
||||||
|
clusterInfo := &types.ClusterInfo{
|
||||||
|
ActiveTopology: buildTopology(servers, volumes),
|
||||||
|
VolumeReplicaMap: map[uint32][]types.ReplicaLocation{
|
||||||
|
1: {
|
||||||
|
loc("dc1", "rack1", "node-a"),
|
||||||
|
loc("dc1", "rack1", "node-d"),
|
||||||
|
loc("dc1", "rack2", "node-e"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
task, destination := createBalanceTask("hdd", volumes[0], clusterInfo, "node-b", map[string]int{
|
||||||
|
"node-b": 0,
|
||||||
|
"node-c": 0,
|
||||||
|
})
|
||||||
|
if task == nil {
|
||||||
|
t.Fatal("expected a balance task")
|
||||||
|
}
|
||||||
|
if destination != "node-c" {
|
||||||
|
t.Fatalf("expected fallback destination node-c, got %s", destination)
|
||||||
|
}
|
||||||
|
if len(task.TypedParams.Targets) != 1 {
|
||||||
|
t.Fatalf("expected 1 target, got %d", len(task.TypedParams.Targets))
|
||||||
|
}
|
||||||
|
if got := task.TypedParams.Targets[0].Node; got != "node-c:8080" {
|
||||||
|
t.Fatalf("expected target node-c:8080, got %s", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user