Introduce logic to resolve volume replica placement within EC rebalancing. (#6254)
* Rename `command_ec_encode_test.go` to `command_ec_common_test.go`. All tests defined in this file are now for `command_ec_common.go`. * Minor code cleanups. - Fix broken `ec.balance` test. - Rework integer ceiling division to not use floats, which can introduce precision errors. * Introduce logic to resolve volume replica placement within EC rebalancing. This will be used to make rebalancing logic topology-aware. * Give shell.EcNode.dc a dedicated DataCenterId type.
This commit is contained in:
@@ -3,7 +3,6 @@ package shell
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
||||
"github.com/seaweedfs/seaweedfs/weed/operation"
|
||||
@@ -12,11 +11,32 @@ import (
|
||||
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
|
||||
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||
"golang.org/x/exp/slices"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type DataCenterId string
|
||||
type EcNodeId string
|
||||
type RackId string
|
||||
|
||||
type EcNode struct {
|
||||
info *master_pb.DataNodeInfo
|
||||
dc DataCenterId
|
||||
rack RackId
|
||||
freeEcSlot int
|
||||
}
|
||||
type CandidateEcNode struct {
|
||||
ecNode *EcNode
|
||||
shardCount int
|
||||
}
|
||||
|
||||
type EcRack struct {
|
||||
ecNodes map[EcNodeId]*EcNode
|
||||
freeEcSlot int
|
||||
}
|
||||
|
||||
func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
|
||||
|
||||
if !commandEnv.isLocked() {
|
||||
@@ -68,7 +88,6 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
|
||||
err = operation.WithVolumeServerClient(false, targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
|
||||
|
||||
if targetAddress != existingLocation {
|
||||
|
||||
fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
|
||||
_, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
|
||||
VolumeId: uint32(volumeId),
|
||||
@@ -109,6 +128,7 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: Make dc a DataCenterId instead of string.
|
||||
func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc string, rack RackId, dn *master_pb.DataNodeInfo)) {
|
||||
for _, dc := range topo.DataCenterInfos {
|
||||
for _, rack := range dc.RackInfos {
|
||||
@@ -131,11 +151,6 @@ func sortEcNodesByFreeslotsAscending(ecNodes []*EcNode) {
|
||||
})
|
||||
}
|
||||
|
||||
type CandidateEcNode struct {
|
||||
ecNode *EcNode
|
||||
shardCount int
|
||||
}
|
||||
|
||||
// if the index node changed the freeEcSlot, need to keep every EcNode still sorted
|
||||
func ensureSortedEcNodes(data []*CandidateEcNode, index int, lessThan func(i, j int) bool) {
|
||||
for i := index - 1; i >= 0; i-- {
|
||||
@@ -179,16 +194,6 @@ func countFreeShardSlots(dn *master_pb.DataNodeInfo, diskType types.DiskType) (c
|
||||
return int(diskInfo.MaxVolumeCount-diskInfo.VolumeCount)*erasure_coding.DataShardsCount - countShards(diskInfo.EcShardInfos)
|
||||
}
|
||||
|
||||
type RackId string
|
||||
type EcNodeId string
|
||||
|
||||
type EcNode struct {
|
||||
info *master_pb.DataNodeInfo
|
||||
dc string
|
||||
rack RackId
|
||||
freeEcSlot int
|
||||
}
|
||||
|
||||
func (ecNode *EcNode) localShardIdCount(vid uint32) int {
|
||||
for _, diskInfo := range ecNode.info.DiskInfos {
|
||||
for _, ecShardInfo := range diskInfo.EcShardInfos {
|
||||
@@ -201,13 +206,7 @@ func (ecNode *EcNode) localShardIdCount(vid uint32) int {
|
||||
return 0
|
||||
}
|
||||
|
||||
type EcRack struct {
|
||||
ecNodes map[EcNodeId]*EcNode
|
||||
freeEcSlot int
|
||||
}
|
||||
|
||||
func collectEcNodes(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
|
||||
|
||||
// list all possible locations
|
||||
// collect topology information
|
||||
topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
|
||||
@@ -232,7 +231,7 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter
|
||||
freeEcSlots := countFreeShardSlots(dn, types.HardDriveType)
|
||||
ecNodes = append(ecNodes, &EcNode{
|
||||
info: dn,
|
||||
dc: dc,
|
||||
dc: DataCenterId(dc),
|
||||
rack: rack,
|
||||
freeEcSlot: int(freeEcSlots),
|
||||
})
|
||||
@@ -283,8 +282,12 @@ func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId n
|
||||
})
|
||||
}
|
||||
|
||||
func ceilDivide(total, n int) int {
|
||||
return int(math.Ceil(float64(total) / float64(n)))
|
||||
func ceilDivide(a, b int) int {
|
||||
var r int
|
||||
if (a % b) != 0 {
|
||||
r = 1
|
||||
}
|
||||
return (a / b) + r
|
||||
}
|
||||
|
||||
func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits {
|
||||
@@ -772,6 +775,21 @@ func collectVolumeIdToEcNodes(allEcNodes []*EcNode, collection string) map[needl
|
||||
return vidLocations
|
||||
}
|
||||
|
||||
func volumeIdToReplicaPlacement(vid needle.VolumeId, nodes []*EcNode) (*super_block.ReplicaPlacement, error) {
|
||||
for _, ecNode := range nodes {
|
||||
for _, diskInfo := range ecNode.info.DiskInfos {
|
||||
for _, volumeInfo := range diskInfo.VolumeInfos {
|
||||
if needle.VolumeId(volumeInfo.Id) != vid {
|
||||
continue
|
||||
}
|
||||
return super_block.NewReplicaPlacementFromByte(byte(volumeInfo.ReplicaPlacement))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("failed to resolve replica placement for volume ID %d", vid)
|
||||
}
|
||||
|
||||
func EcBalance(commandEnv *CommandEnv, collections []string, dc string, applyBalancing bool) (err error) {
|
||||
if len(collections) == 0 {
|
||||
return fmt.Errorf("no collections to balance")
|
||||
|
||||
Reference in New Issue
Block a user