Files
seaweedFS/weed/shell/command_ec_common.go
Chris Lu 208d7f24f4 Erasure Coding: Ec refactoring (#7396)
* refactor: add ECContext structure to encapsulate EC parameters

- Create ec_context.go with ECContext struct
- NewDefaultECContext() creates context with default 10+4 configuration
- Helper methods: CreateEncoder(), ToExt(), String()
- Foundation for cleaner function signatures
- No behavior change, still uses hardcoded 10+4

* refactor: update ec_encoder.go to use ECContext

- Add WriteEcFilesWithContext() and RebuildEcFilesWithContext() functions
- Keep old functions for backward compatibility (call new versions)
- Update all internal functions to accept ECContext parameter
- Use ctx.DataShards, ctx.ParityShards, ctx.TotalShards consistently
- Use ctx.CreateEncoder() instead of hardcoded reedsolomon.New()
- Use ctx.ToExt() for shard file extensions
- No behavior change, still uses default 10+4 configuration

* refactor: update ec_volume.go to use ECContext

- Add ECContext field to EcVolume struct
- Initialize ECContext with default configuration in NewEcVolume()
- Update LocateEcShardNeedleInterval() to use ECContext.DataShards
- Phase 1: Always uses default 10+4 configuration
- No behavior change

* refactor: add EC shard count fields to VolumeInfo protobuf

- Add data_shards_count field (field 8) to VolumeInfo message
- Add parity_shards_count field (field 9) to VolumeInfo message
- Fields are optional, 0 means use default (10+4)
- Backward compatible: fields added at end
- Phase 1: Foundation for future customization

* refactor: regenerate protobuf Go files with EC shard count fields

- Regenerated volume_server_pb/*.go with new EC fields
- DataShardsCount and ParityShardsCount accessors added to VolumeInfo
- No behavior change, fields not yet used

* refactor: update VolumeEcShardsGenerate to use ECContext

- Create ECContext with default configuration in VolumeEcShardsGenerate
- Use ecCtx.TotalShards and ecCtx.ToExt() in cleanup
- Call WriteEcFilesWithContext() instead of WriteEcFiles()
- Save EC configuration (DataShardsCount, ParityShardsCount) to VolumeInfo
- Log EC context being used
- Phase 1: Always uses default 10+4 configuration
- No behavior change

* fmt

* refactor: update ec_test.go to use ECContext

- Update TestEncodingDecoding to create and use ECContext
- Update validateFiles() to accept ECContext parameter
- Update removeGeneratedFiles() to use ctx.TotalShards and ctx.ToExt()
- Test passes with default 10+4 configuration

* refactor: use EcShardConfig message instead of separate fields

* optimize: pre-calculate row sizes in EC encoding loop

* refactor: replace TotalShards field with Total() method

- Remove TotalShards field from ECContext to avoid field drift
- Add Total() method that computes DataShards + ParityShards
- Update all references to use ctx.Total() instead of ctx.TotalShards
- Read EC config from VolumeInfo when loading EC volumes
- Read data shard count from .vif in VolumeEcShardsToVolume
- Use >= instead of > for exact boundary handling in encoding loops

* optimize: simplify VolumeEcShardsToVolume to use existing EC context

- Remove redundant CollectEcShards call
- Remove redundant .vif file loading
- Use v.ECContext.DataShards directly (already loaded by NewEcVolume)
- Slice tempShards instead of collecting again

* refactor: rename MaxShardId to MaxShardCount for clarity

- Change from MaxShardId=31 to MaxShardCount=32
- Eliminates confusing +1 arithmetic (MaxShardId+1)
- More intuitive: MaxShardCount directly represents the limit

fix: support custom EC ratios beyond 14 shards in VolumeEcShardsToVolume

- Add MaxShardId constant (31, since ShardBits is uint32)
- Use MaxShardId+1 (32) instead of TotalShardsCount (14) for tempShards buffer
- Prevents panic when slicing for volumes with >14 total shards
- Critical fix for custom EC configurations like 20+10

* fix: add validation for EC shard counts from VolumeInfo

- Validate DataShards/ParityShards are positive and within MaxShardCount
- Prevent zero or invalid values that could cause divide-by-zero
- Fallback to defaults if validation fails, with warning log
- VolumeEcShardsGenerate now preserves existing EC config when regenerating
- Critical safety fix for corrupted or legacy .vif files

* fix: RebuildEcFiles now loads EC config from .vif file

- Critical: RebuildEcFiles was always using default 10+4 config
- Now loads actual EC config from .vif file when rebuilding shards
- Validates config before use (positive shards, within MaxShardCount)
- Falls back to default if .vif missing or invalid
- Prevents data corruption when rebuilding custom EC volumes

* add: defensive validation for dataShards in VolumeEcShardsToVolume

- Validate dataShards > 0 and <= MaxShardCount before use
- Prevents panic from corrupted or uninitialized ECContext
- Returns clear error message instead of panic
- Defense-in-depth: validates even though upstream should catch issues

* fix: replace TotalShardsCount with MaxShardCount for custom EC ratio support

Critical fixes to support custom EC ratios > 14 shards:

disk_location_ec.go:
- validateEcVolume: Check shards 0-31 instead of 0-13 during validation
- removeEcVolumeFiles: Remove shards 0-31 instead of 0-13 during cleanup

ec_volume_info.go ShardBits methods:
- ShardIds(): Iterate up to MaxShardCount (32) instead of TotalShardsCount (14)
- ToUint32Slice(): Iterate up to MaxShardCount (32)
- IndexToShardId(): Iterate up to MaxShardCount (32)
- MinusParityShards(): Remove shards 10-31 instead of 10-13 (added note about Phase 2)
- Minus() shard size copy: Iterate up to MaxShardCount (32)
- resizeShardSizes(): Iterate up to MaxShardCount (32)

Without these changes:
- Custom EC ratios > 14 total shards would fail validation on startup
- Shards 14-31 would never be discovered or cleaned up
- ShardBits operations would miss shards >= 14

These changes are backward compatible - MaxShardCount (32) includes
the default TotalShardsCount (14), so existing 10+4 volumes work as before.

* fix: replace TotalShardsCount with MaxShardCount in critical data structures

Critical fixes for buffer allocations and loops that must support
custom EC ratios up to 32 shards:

Data Structures:
- store_ec.go:354: Buffer allocation for shard recovery (bufs array)
- topology_ec.go:14: EcShardLocations.Locations fixed array size
- command_ec_rebuild.go:268: EC shard map allocation
- command_ec_common.go:626: Shard-to-locations map allocation

Shard Discovery Loops:
- ec_task.go:378: Loop to find generated shard files
- ec_shard_management.go: All 8 loops that check/count EC shards

These changes are critical because:
1. Buffer allocations sized to 14 would cause index-out-of-bounds panics
   when accessing shards 14-31
2. Fixed arrays sized to 14 would truncate shard location data
3. Loops limited to 0-13 would never discover/manage shards 14-31

Note: command_ec_encode.go:208 intentionally NOT changed - it creates
shard IDs to mount after encoding. In Phase 1 we always generate 14
shards, so this remains TotalShardsCount and will be made dynamic in
Phase 2 based on actual EC context.

Without these fixes, custom EC ratios > 14 total shards would cause:
- Runtime panics (array index out of bounds)
- Data loss (shards 14-31 never discovered/tracked)
- Incomplete shard management (missing shards not detected)

* refactor: move MaxShardCount constant to ec_encoder.go

Moved MaxShardCount from ec_volume_info.go to ec_encoder.go to group it
with other shard count constants (DataShardsCount, ParityShardsCount,
TotalShardsCount). This improves code organization and makes it easier
to understand the relationship between these constants.

Location: ec_encoder.go line 22, between TotalShardsCount and MinTotalDisks

* improve: add defensive programming and better error messages for EC

Code review improvements from CodeRabbit:

1. ShardBits Guardrails (ec_volume_info.go):
   - AddShardId, RemoveShardId: Reject shard IDs >= MaxShardCount
   - HasShardId: Return false for out-of-range shard IDs
   - Prevents silent no-ops from bit shifts with invalid IDs

2. Future-Proof Regex (disk_location_ec.go):
   - Updated regex from \.ec[0-9][0-9] to \.ec\d{2,3}
   - Now matches .ec00 through .ec999 (currently .ec00-.ec31 used)
   - Supports future increases to MaxShardCount beyond 99

3. Better Error Messages (volume_grpc_erasure_coding.go):
   - Include valid range (1..32) in dataShards validation error
   - Helps operators quickly identify the problem

4. Validation Before Save (volume_grpc_erasure_coding.go):
   - Validate ECContext (DataShards > 0, ParityShards > 0, Total <= MaxShardCount)
   - Log EC config being saved to .vif for debugging
   - Prevents writing invalid configs to disk

These changes improve robustness and debuggability without changing
core functionality.

* fmt

* fix: critical bugs from code review + clean up comments

Critical bug fixes:
1. command_ec_rebuild.go: Fixed indentation causing compilation error
   - Properly nested if/for blocks in registerEcNode

2. ec_shard_management.go: Fixed isComplete logic incorrectly using MaxShardCount
   - Changed from MaxShardCount (32) back to TotalShardsCount (14)
   - Default 10+4 volumes were being incorrectly reported as incomplete
   - Missing shards 14-31 were being incorrectly reported as missing
   - Fixed in 4 locations: volume completeness checks and getMissingShards

3. ec_volume_info.go: Fixed MinusParityShards removing too many shards
   - Changed from MaxShardCount (32) back to TotalShardsCount (14)
   - Was incorrectly removing shard IDs 10-31 instead of just 10-13

Comment cleanup:
- Removed Phase 1/Phase 2 references (development plan context)
- Replaced with clear statements about default 10+4 configuration
- SeaweedFS repo uses fixed 10+4 EC ratio, no phases needed

Root cause: Over-aggressive replacement of TotalShardsCount with MaxShardCount.
MaxShardCount (32) is the limit for buffer allocations and shard ID loops,
but TotalShardsCount (14) must be used for default EC configuration logic.

* fix: add defensive bounds checks and compute actual shard counts

Critical fixes from code review:

1. topology_ec.go: Add defensive bounds checks to AddShard/DeleteShard
   - Prevent panic when shardId >= MaxShardCount (32)
   - Return false instead of crashing on out-of-range shard IDs

2. command_ec_common.go: Fix doBalanceEcShardsAcrossRacks
   - Was using hardcoded TotalShardsCount (14) for all volumes
   - Now computes actual totalShardsForVolume from rackToShardCount
   - Fixes incorrect rebalancing for volumes with custom EC ratios
   - Example: 5+2=7 shards would incorrectly use 14 as average

These fixes improve robustness and prepare for future custom EC ratios
without changing current behavior for default 10+4 volumes.

Note: MinusParityShards and ec_task.go intentionally NOT changed for
seaweedfs repo - these will be enhanced in seaweed-enterprise repo
where custom EC ratio configuration is added.

* fmt

* style: make MaxShardCount type casting explicit in loops

Improved code clarity by explicitly casting MaxShardCount to the
appropriate type when used in loop comparisons:

- ShardId comparisons: Cast to ShardId(MaxShardCount)
- uint32 comparisons: Cast to uint32(MaxShardCount)

Changed in 5 locations:
- Minus() loop (line 90)
- ShardIds() loop (line 143)
- ToUint32Slice() loop (line 152)
- IndexToShardId() loop (line 219)
- resizeShardSizes() loop (line 248)

This makes the intent explicit and improves type safety readability.
No functional changes - purely a style improvement.
2025-10-27 22:13:31 -07:00

1074 lines
33 KiB
Go

package shell
import (
"context"
"errors"
"fmt"
"math/rand/v2"
"regexp"
"slices"
"sort"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"google.golang.org/grpc"
)
type DataCenterId string
type EcNodeId string
type RackId string
type EcNode struct {
info *master_pb.DataNodeInfo
dc DataCenterId
rack RackId
freeEcSlot int
}
type CandidateEcNode struct {
ecNode *EcNode
shardCount int
}
type EcRack struct {
ecNodes map[EcNodeId]*EcNode
freeEcSlot int
}
var (
ecBalanceAlgorithmDescription = `
func EcBalance() {
for each collection:
balanceEcVolumes(collectionName)
for each rack:
balanceEcRack(rack)
}
func balanceEcVolumes(collectionName){
for each volume:
doDeduplicateEcShards(volumeId)
tracks rack~shardCount mapping
for each volume:
doBalanceEcShardsAcrossRacks(volumeId)
for each volume:
doBalanceEcShardsWithinRacks(volumeId)
}
// spread ec shards into more racks
func doBalanceEcShardsAcrossRacks(volumeId){
tracks rack~volumeIdShardCount mapping
averageShardsPerEcRack = totalShardNumber / numRacks // totalShardNumber is 14 for now, later could varies for each dc
ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
for each ecShardsToMove {
destRack = pickOneRack(rack~shardCount, rack~volumeIdShardCount, ecShardReplicaPlacement)
destVolumeServers = volume servers on the destRack
pickOneEcNodeAndMoveOneShard(destVolumeServers)
}
}
func doBalanceEcShardsWithinRacks(volumeId){
racks = collect all racks that the volume id is on
for rack, shards := range racks
doBalanceEcShardsWithinOneRack(volumeId, shards, rack)
}
// move ec shards
func doBalanceEcShardsWithinOneRack(volumeId, shards, rackId){
tracks volumeServer~volumeIdShardCount mapping
averageShardCount = len(shards) / numVolumeServers
volumeServersOverAverage = volume servers with volumeId's ec shard counts > averageShardsPerEcRack
ecShardsToMove = select overflown ec shards from volumeServersOverAverage
for each ecShardsToMove {
destVolumeServer = pickOneVolumeServer(volumeServer~shardCount, volumeServer~volumeIdShardCount, ecShardReplicaPlacement)
pickOneEcNodeAndMoveOneShard(destVolumeServers)
}
}
// move ec shards while keeping shard distribution for the same volume unchanged or more even
func balanceEcRack(rack){
averageShardCount = total shards / numVolumeServers
for hasMovedOneEcShard {
sort all volume servers ordered by the number of local ec shards
pick the volume server A with the lowest number of ec shards x
pick the volume server B with the highest number of ec shards y
if y > averageShardCount and x +1 <= averageShardCount {
if B has a ec shard with volume id v that A does not have {
move one ec shard v from B to A
hasMovedOneEcShard = true
}
}
}
}
`
// Overridable functions for testing.
getDefaultReplicaPlacement = _getDefaultReplicaPlacement
)
func _getDefaultReplicaPlacement(commandEnv *CommandEnv) (*super_block.ReplicaPlacement, error) {
var resp *master_pb.GetMasterConfigurationResponse
var err error
err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err = client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
return err
})
if err != nil {
return nil, err
}
return super_block.NewReplicaPlacementFromString(resp.DefaultReplication)
}
func parseReplicaPlacementArg(commandEnv *CommandEnv, replicaStr string) (*super_block.ReplicaPlacement, error) {
var rp *super_block.ReplicaPlacement
var err error
if replicaStr != "" {
rp, err = super_block.NewReplicaPlacementFromString(replicaStr)
if err != nil {
return rp, err
}
fmt.Printf("using replica placement %q for EC volumes\n", rp.String())
} else {
// No replica placement argument provided, resolve from master default settings.
rp, err = getDefaultReplicaPlacement(commandEnv)
if err != nil {
return rp, err
}
fmt.Printf("using master default replica placement %q for EC volumes\n", rp.String())
}
return rp, nil
}
func collectTopologyInfo(commandEnv *CommandEnv, delayBeforeCollecting time.Duration) (topoInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, err error) {
if delayBeforeCollecting > 0 {
time.Sleep(delayBeforeCollecting)
}
var resp *master_pb.VolumeListResponse
err = commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
resp, err = client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
return err
})
if err != nil {
return
}
return resp.TopologyInfo, resp.VolumeSizeLimitMb, nil
}
func collectEcNodesForDC(commandEnv *CommandEnv, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
// list all possible locations
// collect topology information
topologyInfo, _, err := collectTopologyInfo(commandEnv, 0)
if err != nil {
return
}
// find out all volume servers with one slot left.
ecNodes, totalFreeEcSlots = collectEcVolumeServersByDc(topologyInfo, selectedDataCenter)
sortEcNodesByFreeslotsDescending(ecNodes)
return
}
func collectEcNodes(commandEnv *CommandEnv) (ecNodes []*EcNode, totalFreeEcSlots int, err error) {
return collectEcNodesForDC(commandEnv, "")
}
func collectCollectionsForVolumeIds(t *master_pb.TopologyInfo, vids []needle.VolumeId) []string {
if len(vids) == 0 {
return nil
}
found := map[string]bool{}
for _, dc := range t.DataCenterInfos {
for _, r := range dc.RackInfos {
for _, dn := range r.DataNodeInfos {
for _, diskInfo := range dn.DiskInfos {
for _, vi := range diskInfo.VolumeInfos {
for _, vid := range vids {
if needle.VolumeId(vi.Id) == vid {
found[vi.Collection] = true
}
}
}
for _, ecs := range diskInfo.EcShardInfos {
for _, vid := range vids {
if needle.VolumeId(ecs.Id) == vid {
found[ecs.Collection] = true
}
}
}
}
}
}
}
if len(found) == 0 {
return nil
}
collections := []string{}
for k, _ := range found {
collections = append(collections, k)
}
sort.Strings(collections)
return collections
}
func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, destinationEcNode *EcNode, applyBalancing bool) (err error) {
if !commandEnv.isLocked() {
return fmt.Errorf("lock is lost")
}
copiedShardIds := []uint32{uint32(shardId)}
if applyBalancing {
existingServerAddress := pb.NewServerAddressFromDataNode(existingLocation.info)
// ask destination node to copy shard and the ecx file from source node, and mount it
copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingServerAddress)
if err != nil {
return err
}
// unmount the to be deleted shards
err = unmountEcShards(commandEnv.option.GrpcDialOption, vid, existingServerAddress, copiedShardIds)
if err != nil {
return err
}
// ask source node to delete the shard, and maybe the ecx file
err = sourceServerDeleteEcShards(commandEnv.option.GrpcDialOption, collection, vid, existingServerAddress, copiedShardIds)
if err != nil {
return err
}
fmt.Printf("moved ec shard %d.%d %s => %s\n", vid, shardId, existingLocation.info.Id, destinationEcNode.info.Id)
}
destinationEcNode.addEcVolumeShards(vid, collection, copiedShardIds)
existingLocation.deleteEcVolumeShards(vid, copiedShardIds)
return nil
}
func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
targetServer *EcNode, shardIdsToCopy []uint32,
volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress) (copiedShardIds []uint32, err error) {
fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
targetAddress := pb.NewServerAddressFromDataNode(targetServer.info)
err = operation.WithVolumeServerClient(false, targetAddress, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
if targetAddress != existingLocation {
fmt.Printf("copy %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
_, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
VolumeId: uint32(volumeId),
Collection: collection,
ShardIds: shardIdsToCopy,
CopyEcxFile: true,
CopyEcjFile: true,
CopyVifFile: true,
SourceDataNode: string(existingLocation),
})
if copyErr != nil {
return fmt.Errorf("copy %d.%v %s => %s : %v\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id, copyErr)
}
}
fmt.Printf("mount %d.%v on %s\n", volumeId, shardIdsToCopy, targetServer.info.Id)
_, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
VolumeId: uint32(volumeId),
Collection: collection,
ShardIds: shardIdsToCopy,
})
if mountErr != nil {
return fmt.Errorf("mount %d.%v on %s : %v\n", volumeId, shardIdsToCopy, targetServer.info.Id, mountErr)
}
if targetAddress != existingLocation {
copiedShardIds = shardIdsToCopy
glog.V(0).Infof("%s ec volume %d deletes shards %+v", existingLocation, volumeId, copiedShardIds)
}
return nil
})
if err != nil {
return
}
return
}
func eachDataNode(topo *master_pb.TopologyInfo, fn func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo)) {
for _, dc := range topo.DataCenterInfos {
for _, rack := range dc.RackInfos {
for _, dn := range rack.DataNodeInfos {
fn(DataCenterId(dc.Id), RackId(rack.Id), dn)
}
}
}
}
func sortEcNodesByFreeslotsDescending(ecNodes []*EcNode) {
slices.SortFunc(ecNodes, func(a, b *EcNode) int {
return b.freeEcSlot - a.freeEcSlot
})
}
func sortEcNodesByFreeslotsAscending(ecNodes []*EcNode) {
slices.SortFunc(ecNodes, func(a, b *EcNode) int {
return a.freeEcSlot - b.freeEcSlot
})
}
// if the index node changed the freeEcSlot, need to keep every EcNode still sorted
func ensureSortedEcNodes(data []*CandidateEcNode, index int, lessThan func(i, j int) bool) {
for i := index - 1; i >= 0; i-- {
if lessThan(i+1, i) {
swap(data, i, i+1)
} else {
break
}
}
for i := index + 1; i < len(data); i++ {
if lessThan(i, i-1) {
swap(data, i, i-1)
} else {
break
}
}
}
func swap(data []*CandidateEcNode, i, j int) {
t := data[i]
data[i] = data[j]
data[j] = t
}
func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) {
for _, ecShardInfo := range ecShardInfos {
shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
count += shardBits.ShardIdCount()
}
return
}
func countFreeShardSlots(dn *master_pb.DataNodeInfo, diskType types.DiskType) (count int) {
if dn.DiskInfos == nil {
return 0
}
diskInfo := dn.DiskInfos[string(diskType)]
if diskInfo == nil {
return 0
}
slots := int(diskInfo.MaxVolumeCount-diskInfo.VolumeCount)*erasure_coding.DataShardsCount - countShards(diskInfo.EcShardInfos)
if slots < 0 {
return 0
}
return slots
}
func (ecNode *EcNode) localShardIdCount(vid uint32) int {
for _, diskInfo := range ecNode.info.DiskInfos {
for _, ecShardInfo := range diskInfo.EcShardInfos {
if vid == ecShardInfo.Id {
shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
return shardBits.ShardIdCount()
}
}
}
return 0
}
func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter string) (ecNodes []*EcNode, totalFreeEcSlots int) {
eachDataNode(topo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
if selectedDataCenter != "" && selectedDataCenter != string(dc) {
return
}
freeEcSlots := countFreeShardSlots(dn, types.HardDriveType)
ecNodes = append(ecNodes, &EcNode{
info: dn,
dc: dc,
rack: rack,
freeEcSlot: int(freeEcSlots),
})
totalFreeEcSlots += freeEcSlots
})
return
}
func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeDeletedShardIds []uint32) error {
fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation)
return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, deleteErr := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{
VolumeId: uint32(volumeId),
Collection: collection,
ShardIds: toBeDeletedShardIds,
})
return deleteErr
})
}
func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeUnmountedhardIds []uint32) error {
fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation)
return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, deleteErr := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{
VolumeId: uint32(volumeId),
ShardIds: toBeUnmountedhardIds,
})
return deleteErr
})
}
func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeMountedhardIds []uint32) error {
fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation)
return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
VolumeId: uint32(volumeId),
Collection: collection,
ShardIds: toBeMountedhardIds,
})
return mountErr
})
}
func ceilDivide(a, b int) int {
var r int
if (a % b) != 0 {
r = 1
}
return (a / b) + r
}
func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId) erasure_coding.ShardBits {
if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
for _, shardInfo := range diskInfo.EcShardInfos {
if needle.VolumeId(shardInfo.Id) == vid {
return erasure_coding.ShardBits(shardInfo.EcIndexBits)
}
}
}
return 0
}
func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32) *EcNode {
foundVolume := false
diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
if found {
for _, shardInfo := range diskInfo.EcShardInfos {
if needle.VolumeId(shardInfo.Id) == vid {
oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
newShardBits := oldShardBits
for _, shardId := range shardIds {
newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
}
shardInfo.EcIndexBits = uint32(newShardBits)
ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
foundVolume = true
break
}
}
} else {
diskInfo = &master_pb.DiskInfo{
Type: string(types.HardDriveType),
}
ecNode.info.DiskInfos[string(types.HardDriveType)] = diskInfo
}
if !foundVolume {
var newShardBits erasure_coding.ShardBits
for _, shardId := range shardIds {
newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
}
diskInfo.EcShardInfos = append(diskInfo.EcShardInfos, &master_pb.VolumeEcShardInformationMessage{
Id: uint32(vid),
Collection: collection,
EcIndexBits: uint32(newShardBits),
DiskType: string(types.HardDriveType),
})
ecNode.freeEcSlot -= len(shardIds)
}
return ecNode
}
func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32) *EcNode {
if diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]; found {
for _, shardInfo := range diskInfo.EcShardInfos {
if needle.VolumeId(shardInfo.Id) == vid {
oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
newShardBits := oldShardBits
for _, shardId := range shardIds {
newShardBits = newShardBits.RemoveShardId(erasure_coding.ShardId(shardId))
}
shardInfo.EcIndexBits = uint32(newShardBits)
ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
}
}
}
return ecNode
}
func groupByCount(data []*EcNode, identifierFn func(*EcNode) (id string, count int)) map[string]int {
countMap := make(map[string]int)
for _, d := range data {
id, count := identifierFn(d)
countMap[id] += count
}
return countMap
}
func groupBy(data []*EcNode, identifierFn func(*EcNode) (id string)) map[string][]*EcNode {
groupMap := make(map[string][]*EcNode)
for _, d := range data {
id := identifierFn(d)
groupMap[id] = append(groupMap[id], d)
}
return groupMap
}
type ecBalancer struct {
commandEnv *CommandEnv
ecNodes []*EcNode
replicaPlacement *super_block.ReplicaPlacement
applyBalancing bool
maxParallelization int
}
func (ecb *ecBalancer) errorWaitGroup() *ErrorWaitGroup {
return NewErrorWaitGroup(ecb.maxParallelization)
}
func (ecb *ecBalancer) racks() map[RackId]*EcRack {
racks := make(map[RackId]*EcRack)
for _, ecNode := range ecb.ecNodes {
if racks[ecNode.rack] == nil {
racks[ecNode.rack] = &EcRack{
ecNodes: make(map[EcNodeId]*EcNode),
}
}
racks[ecNode.rack].ecNodes[EcNodeId(ecNode.info.Id)] = ecNode
racks[ecNode.rack].freeEcSlot += ecNode.freeEcSlot
}
return racks
}
func (ecb *ecBalancer) balanceEcVolumes(collection string) error {
fmt.Printf("balanceEcVolumes %s\n", collection)
if err := ecb.deleteDuplicatedEcShards(collection); err != nil {
return fmt.Errorf("delete duplicated collection %s ec shards: %v", collection, err)
}
if err := ecb.balanceEcShardsAcrossRacks(collection); err != nil {
return fmt.Errorf("balance across racks collection %s ec shards: %v", collection, err)
}
if err := ecb.balanceEcShardsWithinRacks(collection); err != nil {
return fmt.Errorf("balance within racks collection %s ec shards: %v", collection, err)
}
return nil
}
func (ecb *ecBalancer) deleteDuplicatedEcShards(collection string) error {
vidLocations := ecb.collectVolumeIdToEcNodes(collection)
ewg := ecb.errorWaitGroup()
for vid, locations := range vidLocations {
ewg.Add(func() error {
return ecb.doDeduplicateEcShards(collection, vid, locations)
})
}
return ewg.Wait()
}
func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.VolumeId, locations []*EcNode) error {
// check whether this volume has ecNodes that are over average
// Use MaxShardCount (32) to support custom EC ratios
shardToLocations := make([][]*EcNode, erasure_coding.MaxShardCount)
for _, ecNode := range locations {
shardBits := findEcVolumeShards(ecNode, vid)
for _, shardId := range shardBits.ShardIds() {
shardToLocations[shardId] = append(shardToLocations[shardId], ecNode)
}
}
for shardId, ecNodes := range shardToLocations {
if len(ecNodes) <= 1 {
continue
}
sortEcNodesByFreeslotsAscending(ecNodes)
fmt.Printf("ec shard %d.%d has %d copies, keeping %v\n", vid, shardId, len(ecNodes), ecNodes[0].info.Id)
if !ecb.applyBalancing {
continue
}
duplicatedShardIds := []uint32{uint32(shardId)}
for _, ecNode := range ecNodes[1:] {
if err := unmountEcShards(ecb.commandEnv.option.GrpcDialOption, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil {
return err
}
if err := sourceServerDeleteEcShards(ecb.commandEnv.option.GrpcDialOption, collection, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil {
return err
}
ecNode.deleteEcVolumeShards(vid, duplicatedShardIds)
}
}
return nil
}
func (ecb *ecBalancer) balanceEcShardsAcrossRacks(collection string) error {
// collect vid => []ecNode, since previous steps can change the locations
vidLocations := ecb.collectVolumeIdToEcNodes(collection)
// spread the ec shards evenly
ewg := ecb.errorWaitGroup()
for vid, locations := range vidLocations {
ewg.Add(func() error {
return ecb.doBalanceEcShardsAcrossRacks(collection, vid, locations)
})
}
return ewg.Wait()
}
func countShardsByRack(vid needle.VolumeId, locations []*EcNode) map[string]int {
return groupByCount(locations, func(ecNode *EcNode) (id string, count int) {
shardBits := findEcVolumeShards(ecNode, vid)
return string(ecNode.rack), shardBits.ShardIdCount()
})
}
func (ecb *ecBalancer) doBalanceEcShardsAcrossRacks(collection string, vid needle.VolumeId, locations []*EcNode) error {
racks := ecb.racks()
// see the volume's shards are in how many racks, and how many in each rack
rackToShardCount := countShardsByRack(vid, locations)
// Calculate actual total shards for this volume (not hardcoded default)
var totalShardsForVolume int
for _, count := range rackToShardCount {
totalShardsForVolume += count
}
// calculate average number of shards an ec rack should have for one volume
averageShardsPerEcRack := ceilDivide(totalShardsForVolume, len(racks))
rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
return string(ecNode.rack)
})
// ecShardsToMove = select overflown ec shards from racks with ec shard counts > averageShardsPerEcRack
ecShardsToMove := make(map[erasure_coding.ShardId]*EcNode)
for rackId, count := range rackToShardCount {
if count <= averageShardsPerEcRack {
continue
}
possibleEcNodes := rackEcNodesWithVid[rackId]
for shardId, ecNode := range pickNEcShardsToMoveFrom(possibleEcNodes, vid, count-averageShardsPerEcRack) {
ecShardsToMove[shardId] = ecNode
}
}
for shardId, ecNode := range ecShardsToMove {
rackId, err := ecb.pickRackToBalanceShardsInto(racks, rackToShardCount)
if err != nil {
fmt.Printf("ec shard %d.%d at %s can not find a destination rack:\n%s\n", vid, shardId, ecNode.info.Id, err.Error())
continue
}
var possibleDestinationEcNodes []*EcNode
for _, n := range racks[rackId].ecNodes {
possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
}
err = ecb.pickOneEcNodeAndMoveOneShard(ecNode, collection, vid, shardId, possibleDestinationEcNodes)
if err != nil {
return err
}
rackToShardCount[string(rackId)] += 1
rackToShardCount[string(ecNode.rack)] -= 1
racks[rackId].freeEcSlot -= 1
racks[ecNode.rack].freeEcSlot += 1
}
return nil
}
func (ecb *ecBalancer) pickRackToBalanceShardsInto(rackToEcNodes map[RackId]*EcRack, rackToShardCount map[string]int) (RackId, error) {
targets := []RackId{}
targetShards := -1
for _, shards := range rackToShardCount {
if shards > targetShards {
targetShards = shards
}
}
details := ""
for rackId, rack := range rackToEcNodes {
shards := rackToShardCount[string(rackId)]
if rack.freeEcSlot <= 0 {
details += fmt.Sprintf(" Skipped %s because it has no free slots\n", rackId)
continue
}
if ecb.replicaPlacement != nil && shards > ecb.replicaPlacement.DiffRackCount {
details += fmt.Sprintf(" Skipped %s because shards %d > replica placement limit for other racks (%d)\n", rackId, shards, ecb.replicaPlacement.DiffRackCount)
continue
}
if shards < targetShards {
// Favor racks with less shards, to ensure an uniform distribution.
targets = nil
targetShards = shards
}
if shards == targetShards {
targets = append(targets, rackId)
}
}
if len(targets) == 0 {
return "", errors.New(details)
}
return targets[rand.IntN(len(targets))], nil
}
func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error {
// collect vid => []ecNode, since previous steps can change the locations
vidLocations := ecb.collectVolumeIdToEcNodes(collection)
racks := ecb.racks()
// spread the ec shards evenly
ewg := ecb.errorWaitGroup()
for vid, locations := range vidLocations {
// see the volume's shards are in how many racks, and how many in each rack
rackToShardCount := countShardsByRack(vid, locations)
rackEcNodesWithVid := groupBy(locations, func(ecNode *EcNode) string {
return string(ecNode.rack)
})
for rackId, _ := range rackToShardCount {
var possibleDestinationEcNodes []*EcNode
for _, n := range racks[RackId(rackId)].ecNodes {
if _, found := n.info.DiskInfos[string(types.HardDriveType)]; found {
possibleDestinationEcNodes = append(possibleDestinationEcNodes, n)
}
}
sourceEcNodes := rackEcNodesWithVid[rackId]
averageShardsPerEcNode := ceilDivide(rackToShardCount[rackId], len(possibleDestinationEcNodes))
ewg.Add(func() error {
return ecb.doBalanceEcShardsWithinOneRack(averageShardsPerEcNode, collection, vid, sourceEcNodes, possibleDestinationEcNodes)
})
}
}
return ewg.Wait()
}
func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode) error {
for _, ecNode := range existingLocations {
shardBits := findEcVolumeShards(ecNode, vid)
overLimitCount := shardBits.ShardIdCount() - averageShardsPerEcNode
for _, shardId := range shardBits.ShardIds() {
if overLimitCount <= 0 {
break
}
fmt.Printf("%s has %d overlimit, moving ec shard %d.%d\n", ecNode.info.Id, overLimitCount, vid, shardId)
err := ecb.pickOneEcNodeAndMoveOneShard(ecNode, collection, vid, shardId, possibleDestinationEcNodes)
if err != nil {
return err
}
overLimitCount--
}
}
return nil
}
func (ecb *ecBalancer) balanceEcRacks() error {
// balance one rack for all ec shards
ewg := ecb.errorWaitGroup()
for _, ecRack := range ecb.racks() {
ewg.Add(func() error {
return ecb.doBalanceEcRack(ecRack)
})
}
return ewg.Wait()
}
func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
if len(ecRack.ecNodes) <= 1 {
return nil
}
var rackEcNodes []*EcNode
for _, node := range ecRack.ecNodes {
rackEcNodes = append(rackEcNodes, node)
}
ecNodeIdToShardCount := groupByCount(rackEcNodes, func(ecNode *EcNode) (id string, count int) {
diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
if !found {
return
}
for _, ecShardInfo := range diskInfo.EcShardInfos {
count += erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIdCount()
}
return ecNode.info.Id, count
})
var totalShardCount int
for _, count := range ecNodeIdToShardCount {
totalShardCount += count
}
averageShardCount := ceilDivide(totalShardCount, len(rackEcNodes))
hasMove := true
for hasMove {
hasMove = false
slices.SortFunc(rackEcNodes, func(a, b *EcNode) int {
return b.freeEcSlot - a.freeEcSlot
})
emptyNode, fullNode := rackEcNodes[0], rackEcNodes[len(rackEcNodes)-1]
emptyNodeShardCount, fullNodeShardCount := ecNodeIdToShardCount[emptyNode.info.Id], ecNodeIdToShardCount[fullNode.info.Id]
if fullNodeShardCount > averageShardCount && emptyNodeShardCount+1 <= averageShardCount {
emptyNodeIds := make(map[uint32]bool)
if emptyDiskInfo, found := emptyNode.info.DiskInfos[string(types.HardDriveType)]; found {
for _, shards := range emptyDiskInfo.EcShardInfos {
emptyNodeIds[shards.Id] = true
}
}
if fullDiskInfo, found := fullNode.info.DiskInfos[string(types.HardDriveType)]; found {
for _, shards := range fullDiskInfo.EcShardInfos {
if _, found := emptyNodeIds[shards.Id]; !found {
for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() {
fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, needle.VolumeId(shards.Id), shardId, emptyNode, ecb.applyBalancing)
if err != nil {
return err
}
ecNodeIdToShardCount[emptyNode.info.Id]++
ecNodeIdToShardCount[fullNode.info.Id]--
hasMove = true
break
}
break
}
}
}
}
}
return nil
}
func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existingLocation *EcNode, possibleDestinations []*EcNode) (*EcNode, error) {
if existingLocation == nil {
return nil, fmt.Errorf("INTERNAL: missing source nodes")
}
if len(possibleDestinations) == 0 {
return nil, fmt.Errorf("INTERNAL: missing destination nodes")
}
nodeShards := map[*EcNode]int{}
for _, node := range possibleDestinations {
nodeShards[node] = findEcVolumeShards(node, vid).ShardIdCount()
}
targets := []*EcNode{}
targetShards := -1
for _, shards := range nodeShards {
if shards > targetShards {
targetShards = shards
}
}
details := ""
for _, node := range possibleDestinations {
if node.info.Id == existingLocation.info.Id {
continue
}
if node.freeEcSlot <= 0 {
details += fmt.Sprintf(" Skipped %s because it has no free slots\n", node.info.Id)
continue
}
shards := nodeShards[node]
if ecb.replicaPlacement != nil && shards > ecb.replicaPlacement.SameRackCount+1 {
details += fmt.Sprintf(" Skipped %s because shards %d > replica placement limit for the rack (%d + 1)\n", node.info.Id, shards, ecb.replicaPlacement.SameRackCount)
continue
}
if shards < targetShards {
// Favor nodes with less shards, to ensure an uniform distribution.
targets = nil
targetShards = shards
}
if shards == targetShards {
targets = append(targets, node)
}
}
if len(targets) == 0 {
return nil, errors.New(details)
}
return targets[rand.IntN(len(targets))], nil
}
func (ecb *ecBalancer) pickOneEcNodeAndMoveOneShard(existingLocation *EcNode, collection string, vid needle.VolumeId, shardId erasure_coding.ShardId, possibleDestinationEcNodes []*EcNode) error {
destNode, err := ecb.pickEcNodeToBalanceShardsInto(vid, existingLocation, possibleDestinationEcNodes)
if err != nil {
fmt.Printf("WARNING: Could not find suitable taget node for %d.%d:\n%s", vid, shardId, err.Error())
return nil
}
fmt.Printf("%s moves ec shard %d.%d to %s\n", existingLocation.info.Id, vid, shardId, destNode.info.Id)
return moveMountedShardToEcNode(ecb.commandEnv, existingLocation, collection, vid, shardId, destNode, ecb.applyBalancing)
}
func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int) map[erasure_coding.ShardId]*EcNode {
picked := make(map[erasure_coding.ShardId]*EcNode)
var candidateEcNodes []*CandidateEcNode
for _, ecNode := range ecNodes {
shardBits := findEcVolumeShards(ecNode, vid)
if shardBits.ShardIdCount() > 0 {
candidateEcNodes = append(candidateEcNodes, &CandidateEcNode{
ecNode: ecNode,
shardCount: shardBits.ShardIdCount(),
})
}
}
slices.SortFunc(candidateEcNodes, func(a, b *CandidateEcNode) int {
return b.shardCount - a.shardCount
})
for i := 0; i < n; i++ {
selectedEcNodeIndex := -1
for i, candidateEcNode := range candidateEcNodes {
shardBits := findEcVolumeShards(candidateEcNode.ecNode, vid)
if shardBits > 0 {
selectedEcNodeIndex = i
for _, shardId := range shardBits.ShardIds() {
candidateEcNode.shardCount--
picked[shardId] = candidateEcNode.ecNode
candidateEcNode.ecNode.deleteEcVolumeShards(vid, []uint32{uint32(shardId)})
break
}
break
}
}
if selectedEcNodeIndex >= 0 {
ensureSortedEcNodes(candidateEcNodes, selectedEcNodeIndex, func(i, j int) bool {
return candidateEcNodes[i].shardCount > candidateEcNodes[j].shardCount
})
}
}
return picked
}
func (ecb *ecBalancer) collectVolumeIdToEcNodes(collection string) map[needle.VolumeId][]*EcNode {
vidLocations := make(map[needle.VolumeId][]*EcNode)
for _, ecNode := range ecb.ecNodes {
diskInfo, found := ecNode.info.DiskInfos[string(types.HardDriveType)]
if !found {
continue
}
for _, shardInfo := range diskInfo.EcShardInfos {
// ignore if not in current collection
if shardInfo.Collection == collection {
vidLocations[needle.VolumeId(shardInfo.Id)] = append(vidLocations[needle.VolumeId(shardInfo.Id)], ecNode)
}
}
}
return vidLocations
}
func EcBalance(commandEnv *CommandEnv, collections []string, dc string, ecReplicaPlacement *super_block.ReplicaPlacement, maxParallelization int, applyBalancing bool) (err error) {
// collect all ec nodes
allEcNodes, totalFreeEcSlots, err := collectEcNodesForDC(commandEnv, dc)
if err != nil {
return err
}
if totalFreeEcSlots < 1 {
return fmt.Errorf("no free ec shard slots. only %d left", totalFreeEcSlots)
}
ecb := &ecBalancer{
commandEnv: commandEnv,
ecNodes: allEcNodes,
replicaPlacement: ecReplicaPlacement,
applyBalancing: applyBalancing,
maxParallelization: maxParallelization,
}
if len(collections) == 0 {
fmt.Printf("WARNING: No collections to balance EC volumes across.\n")
}
for _, c := range collections {
if err = ecb.balanceEcVolumes(c); err != nil {
return err
}
}
if err := ecb.balanceEcRacks(); err != nil {
return fmt.Errorf("balance ec racks: %w", err)
}
return nil
}
// compileCollectionPattern compiles a regex pattern for collection matching.
// Empty patterns match empty collections only.
func compileCollectionPattern(pattern string) (*regexp.Regexp, error) {
if pattern == "" {
// empty pattern matches empty collection
return regexp.Compile("^$")
}
return regexp.Compile(pattern)
}