Fix reporting of EC shard sizes from nodes to masters. (#7835)

SeaweedFS tracks EC shard sizes on topology data stuctures, but this information is never
relayed to master servers :( The end result is that commands reporting disk usage, such
as `volume.list` and `cluster.status`, yield incorrect figures when EC shards are present.

As an example for a simple 5-node test cluster, before...

```
> volume.list
Topology volumeSizeLimit:30000 MB hdd(volume:6/40 active:6 free:33 remote:0)
  DataCenter DefaultDataCenter hdd(volume:6/40 active:6 free:33 remote:0)
    Rack DefaultRack hdd(volume:6/40 active:6 free:33 remote:0)
      DataNode 192.168.10.111:9001 hdd(volume:1/8 active:1 free:7 remote:0)
        Disk hdd(volume:1/8 active:1 free:7 remote:0) id:0
          volume id:3  size:88967096  file_count:172  replica_placement:2  version:3  modified_at_second:1766349617
          ec volume id:1 collection: shards:[1 5]
        Disk hdd total size:88967096 file_count:172
      DataNode 192.168.10.111:9001 total size:88967096 file_count:172
  DataCenter DefaultDataCenter hdd(volume:6/40 active:6 free:33 remote:0)
    Rack DefaultRack hdd(volume:6/40 active:6 free:33 remote:0)
      DataNode 192.168.10.111:9002 hdd(volume:2/8 active:2 free:6 remote:0)
        Disk hdd(volume:2/8 active:2 free:6 remote:0) id:0
          volume id:2  size:77267536  file_count:166  replica_placement:2  version:3  modified_at_second:1766349617
          volume id:3  size:88967096  file_count:172  replica_placement:2  version:3  modified_at_second:1766349617
          ec volume id:1 collection: shards:[0 4]
        Disk hdd total size:166234632 file_count:338
      DataNode 192.168.10.111:9002 total size:166234632 file_count:338
  DataCenter DefaultDataCenter hdd(volume:6/40 active:6 free:33 remote:0)
    Rack DefaultRack hdd(volume:6/40 active:6 free:33 remote:0)
      DataNode 192.168.10.111:9003 hdd(volume:1/8 active:1 free:7 remote:0)
        Disk hdd(volume:1/8 active:1 free:7 remote:0) id:0
          volume id:2  size:77267536  file_count:166  replica_placement:2  version:3  modified_at_second:1766349617
          ec volume id:1 collection: shards:[2 6]
        Disk hdd total size:77267536 file_count:166
      DataNode 192.168.10.111:9003 total size:77267536 file_count:166
  DataCenter DefaultDataCenter hdd(volume:6/40 active:6 free:33 remote:0)
    Rack DefaultRack hdd(volume:6/40 active:6 free:33 remote:0)
      DataNode 192.168.10.111:9004 hdd(volume:2/8 active:2 free:6 remote:0)
        Disk hdd(volume:2/8 active:2 free:6 remote:0) id:0
          volume id:2  size:77267536  file_count:166  replica_placement:2  version:3  modified_at_second:1766349617
          volume id:3  size:88967096  file_count:172  replica_placement:2  version:3  modified_at_second:1766349617
          ec volume id:1 collection: shards:[3 7]
        Disk hdd total size:166234632 file_count:338
      DataNode 192.168.10.111:9004 total size:166234632 file_count:338
  DataCenter DefaultDataCenter hdd(volume:6/40 active:6 free:33 remote:0)
    Rack DefaultRack hdd(volume:6/40 active:6 free:33 remote:0)
      DataNode 192.168.10.111:9005 hdd(volume:0/8 active:0 free:8 remote:0)
        Disk hdd(volume:0/8 active:0 free:8 remote:0) id:0
          ec volume id:1 collection: shards:[8 9 10 11 12 13]
        Disk hdd total size:0 file_count:0
    Rack DefaultRack total size:498703896 file_count:1014
  DataCenter DefaultDataCenter total size:498703896 file_count:1014
total size:498703896 file_count:1014
```

...and after:

```
> volume.list
Topology volumeSizeLimit:30000 MB hdd(volume:6/40 active:6 free:33 remote:0)
  DataCenter DefaultDataCenter hdd(volume:6/40 active:6 free:33 remote:0)
    Rack DefaultRack hdd(volume:6/40 active:6 free:33 remote:0)
      DataNode 192.168.10.111:9001 hdd(volume:1/8 active:1 free:7 remote:0)
        Disk hdd(volume:1/8 active:1 free:7 remote:0) id:0
          volume id:2  size:81761800  file_count:161  replica_placement:2  version:3  modified_at_second:1766349495
          ec volume id:1 collection: shards:[1 5 9] sizes:[1:8.00 MiB 5:8.00 MiB 9:8.00 MiB] total:24.00 MiB
        Disk hdd total size:81761800 file_count:161
      DataNode 192.168.10.111:9001 total size:81761800 file_count:161
  DataCenter DefaultDataCenter hdd(volume:6/40 active:6 free:33 remote:0)
    Rack DefaultRack hdd(volume:6/40 active:6 free:33 remote:0)
      DataNode 192.168.10.111:9002 hdd(volume:1/8 active:1 free:7 remote:0)
        Disk hdd(volume:1/8 active:1 free:7 remote:0) id:0
          volume id:3  size:88678712  file_count:170  replica_placement:2  version:3  modified_at_second:1766349495
          ec volume id:1 collection: shards:[11 12 13] sizes:[11:8.00 MiB 12:8.00 MiB 13:8.00 MiB] total:24.00 MiB
        Disk hdd total size:88678712 file_count:170
      DataNode 192.168.10.111:9002 total size:88678712 file_count:170
  DataCenter DefaultDataCenter hdd(volume:6/40 active:6 free:33 remote:0)
    Rack DefaultRack hdd(volume:6/40 active:6 free:33 remote:0)
      DataNode 192.168.10.111:9003 hdd(volume:2/8 active:2 free:6 remote:0)
        Disk hdd(volume:2/8 active:2 free:6 remote:0) id:0
          volume id:2  size:81761800  file_count:161  replica_placement:2  version:3  modified_at_second:1766349495
          volume id:3  size:88678712  file_count:170  replica_placement:2  version:3  modified_at_second:1766349495
          ec volume id:1 collection: shards:[0 4 8] sizes:[0:8.00 MiB 4:8.00 MiB 8:8.00 MiB] total:24.00 MiB
        Disk hdd total size:170440512 file_count:331
      DataNode 192.168.10.111:9003 total size:170440512 file_count:331
  DataCenter DefaultDataCenter hdd(volume:6/40 active:6 free:33 remote:0)
    Rack DefaultRack hdd(volume:6/40 active:6 free:33 remote:0)
      DataNode 192.168.10.111:9004 hdd(volume:2/8 active:2 free:6 remote:0)
        Disk hdd(volume:2/8 active:2 free:6 remote:0) id:0
          volume id:2  size:81761800  file_count:161  replica_placement:2  version:3  modified_at_second:1766349495
          volume id:3  size:88678712  file_count:170  replica_placement:2  version:3  modified_at_second:1766349495
          ec volume id:1 collection: shards:[2 6 10] sizes:[2:8.00 MiB 6:8.00 MiB 10:8.00 MiB] total:24.00 MiB
        Disk hdd total size:170440512 file_count:331
      DataNode 192.168.10.111:9004 total size:170440512 file_count:331
  DataCenter DefaultDataCenter hdd(volume:6/40 active:6 free:33 remote:0)
    Rack DefaultRack hdd(volume:6/40 active:6 free:33 remote:0)
      DataNode 192.168.10.111:9005 hdd(volume:0/8 active:0 free:8 remote:0)
        Disk hdd(volume:0/8 active:0 free:8 remote:0) id:0
          ec volume id:1 collection: shards:[3 7] sizes:[3:8.00 MiB 7:8.00 MiB] total:16.00 MiB
        Disk hdd total size:0 file_count:0
    Rack DefaultRack total size:511321536 file_count:993
  DataCenter DefaultDataCenter total size:511321536 file_count:993
total size:511321536 file_count:993
```
This commit is contained in:
Lisandro Pin
2025-12-29 04:30:42 +01:00
committed by GitHub
parent 2b529e310d
commit 6b98b52acc
28 changed files with 801 additions and 773 deletions

View File

@@ -310,7 +310,7 @@ func (sp *ClusterStatusPrinter) printVolumeInfo() {
for _, eci := range di.EcShardInfos {
vid := needle.VolumeId(eci.Id)
ecVolumeIds[vid] = true
ecShards += erasure_coding.ShardBits(eci.EcIndexBits).ShardIdCount()
ecShards += erasure_coding.ShardsCountFromVolumeEcShardInformationMessage(eci)
}
}
}

View File

@@ -32,8 +32,8 @@ type EcDisk struct {
diskType string
freeEcSlots int
ecShardCount int // Total EC shards on this disk
// Map of volumeId -> shardBits for shards on this disk
ecShards map[needle.VolumeId]erasure_coding.ShardBits
// Map of volumeId -> ShardsInfo for shards on this disk
ecShards map[needle.VolumeId]*erasure_coding.ShardsInfo
}
type EcNode struct {
@@ -277,14 +277,14 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode,
return fmt.Errorf("lock is lost")
}
copiedShardIds := []uint32{uint32(shardId)}
copiedShardIds := []erasure_coding.ShardId{shardId}
if applyBalancing {
existingServerAddress := pb.NewServerAddressFromDataNode(existingLocation.info)
// ask destination node to copy shard and the ecx file from source node, and mount it
copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []uint32{uint32(shardId)}, vid, collection, existingServerAddress, destDiskId)
copiedShardIds, err = oneServerCopyAndMountEcShardsFromSource(commandEnv.option.GrpcDialOption, destinationEcNode, []erasure_coding.ShardId{shardId}, vid, collection, existingServerAddress, destDiskId)
if err != nil {
return err
}
@@ -317,8 +317,8 @@ func moveMountedShardToEcNode(commandEnv *CommandEnv, existingLocation *EcNode,
}
func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
targetServer *EcNode, shardIdsToCopy []uint32,
volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress, destDiskId uint32) (copiedShardIds []uint32, err error) {
targetServer *EcNode, shardIdsToCopy []erasure_coding.ShardId,
volumeId needle.VolumeId, collection string, existingLocation pb.ServerAddress, destDiskId uint32) (copiedShardIds []erasure_coding.ShardId, err error) {
fmt.Printf("allocate %d.%v %s => %s\n", volumeId, shardIdsToCopy, existingLocation, targetServer.info.Id)
@@ -330,7 +330,7 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
_, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
VolumeId: uint32(volumeId),
Collection: collection,
ShardIds: shardIdsToCopy,
ShardIds: erasure_coding.ShardIdsToUint32(shardIdsToCopy),
CopyEcxFile: true,
CopyEcjFile: true,
CopyVifFile: true,
@@ -346,7 +346,7 @@ func oneServerCopyAndMountEcShardsFromSource(grpcDialOption grpc.DialOption,
_, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
VolumeId: uint32(volumeId),
Collection: collection,
ShardIds: shardIdsToCopy,
ShardIds: erasure_coding.ShardIdsToUint32(shardIdsToCopy),
})
if mountErr != nil {
return fmt.Errorf("mount %d.%v on %s : %v\n", volumeId, shardIdsToCopy, targetServer.info.Id, mountErr)
@@ -414,9 +414,8 @@ func swap(data []*CandidateEcNode, i, j int) {
}
func countShards(ecShardInfos []*master_pb.VolumeEcShardInformationMessage) (count int) {
for _, ecShardInfo := range ecShardInfos {
shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
count += shardBits.ShardIdCount()
for _, eci := range ecShardInfos {
count += erasure_coding.ShardsCountFromVolumeEcShardInformationMessage(eci)
}
return
}
@@ -440,10 +439,9 @@ func countFreeShardSlots(dn *master_pb.DataNodeInfo, diskType types.DiskType) (c
func (ecNode *EcNode) localShardIdCount(vid uint32) int {
for _, diskInfo := range ecNode.info.DiskInfos {
for _, ecShardInfo := range diskInfo.EcShardInfos {
if vid == ecShardInfo.Id {
shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
return shardBits.ShardIdCount()
for _, eci := range diskInfo.EcShardInfos {
if vid == eci.Id {
return erasure_coding.ShardsCountFromVolumeEcShardInformationMessage(eci)
}
}
}
@@ -483,18 +481,18 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter
}
// Group EC shards by disk_id
diskShards := make(map[uint32]map[needle.VolumeId]erasure_coding.ShardBits)
diskShards := make(map[uint32]map[needle.VolumeId]*erasure_coding.ShardsInfo)
for _, diskInfo := range dn.DiskInfos {
if diskInfo == nil {
continue
}
for _, ecShardInfo := range diskInfo.EcShardInfos {
diskId := ecShardInfo.DiskId
for _, eci := range diskInfo.EcShardInfos {
diskId := eci.DiskId
if diskShards[diskId] == nil {
diskShards[diskId] = make(map[needle.VolumeId]erasure_coding.ShardBits)
diskShards[diskId] = make(map[needle.VolumeId]*erasure_coding.ShardsInfo)
}
vid := needle.VolumeId(ecShardInfo.Id)
diskShards[diskId][vid] = erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
vid := needle.VolumeId(eci.Id)
diskShards[diskId][vid] = erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(eci)
}
}
@@ -508,11 +506,11 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter
for diskId, diskTypeStr := range allDiskIds {
shards := diskShards[diskId]
if shards == nil {
shards = make(map[needle.VolumeId]erasure_coding.ShardBits)
shards = make(map[needle.VolumeId]*erasure_coding.ShardsInfo)
}
totalShardCount := 0
for _, shardBits := range shards {
totalShardCount += shardBits.ShardIdCount()
for _, shardsInfo := range shards {
totalShardCount += shardsInfo.Count()
}
ecNode.disks[diskId] = &EcDisk{
@@ -530,7 +528,7 @@ func collectEcVolumeServersByDc(topo *master_pb.TopologyInfo, selectedDataCenter
return
}
func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeDeletedShardIds []uint32) error {
func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeDeletedShardIds []erasure_coding.ShardId) error {
fmt.Printf("delete %d.%v from %s\n", volumeId, toBeDeletedShardIds, sourceLocation)
@@ -538,27 +536,27 @@ func sourceServerDeleteEcShards(grpcDialOption grpc.DialOption, collection strin
_, deleteErr := volumeServerClient.VolumeEcShardsDelete(context.Background(), &volume_server_pb.VolumeEcShardsDeleteRequest{
VolumeId: uint32(volumeId),
Collection: collection,
ShardIds: toBeDeletedShardIds,
ShardIds: erasure_coding.ShardIdsToUint32(toBeDeletedShardIds),
})
return deleteErr
})
}
func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeUnmountedhardIds []uint32) error {
func unmountEcShards(grpcDialOption grpc.DialOption, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeUnmountedhardIds []erasure_coding.ShardId) error {
fmt.Printf("unmount %d.%v from %s\n", volumeId, toBeUnmountedhardIds, sourceLocation)
return operation.WithVolumeServerClient(false, sourceLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, deleteErr := volumeServerClient.VolumeEcShardsUnmount(context.Background(), &volume_server_pb.VolumeEcShardsUnmountRequest{
VolumeId: uint32(volumeId),
ShardIds: toBeUnmountedhardIds,
ShardIds: erasure_coding.ShardIdsToUint32(toBeUnmountedhardIds),
})
return deleteErr
})
}
func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeMountedhardIds []uint32) error {
func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress, toBeMountedhardIds []erasure_coding.ShardId) error {
fmt.Printf("mount %d.%v on %s\n", volumeId, toBeMountedhardIds, sourceLocation)
@@ -566,7 +564,7 @@ func mountEcShards(grpcDialOption grpc.DialOption, collection string, volumeId n
_, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
VolumeId: uint32(volumeId),
Collection: collection,
ShardIds: toBeMountedhardIds,
ShardIds: erasure_coding.ShardIdsToUint32(toBeMountedhardIds),
})
return mountErr
})
@@ -580,33 +578,35 @@ func ceilDivide(a, b int) int {
return (a / b) + r
}
func findEcVolumeShards(ecNode *EcNode, vid needle.VolumeId, diskType types.DiskType) erasure_coding.ShardBits {
func findEcVolumeShardsInfo(ecNode *EcNode, vid needle.VolumeId, diskType types.DiskType) *erasure_coding.ShardsInfo {
if diskInfo, found := ecNode.info.DiskInfos[string(diskType)]; found {
for _, shardInfo := range diskInfo.EcShardInfos {
if needle.VolumeId(shardInfo.Id) == vid {
return erasure_coding.ShardBits(shardInfo.EcIndexBits)
return erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(shardInfo)
}
}
}
return 0
// Returns an empty ShardsInfo struct on failure, to avoid potential nil dereferences.
return erasure_coding.NewShardsInfo()
}
func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []uint32, diskType types.DiskType) *EcNode {
// TODO: simplify me
func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string, shardIds []erasure_coding.ShardId, diskType types.DiskType) *EcNode {
foundVolume := false
diskInfo, found := ecNode.info.DiskInfos[string(diskType)]
if found {
for _, shardInfo := range diskInfo.EcShardInfos {
if needle.VolumeId(shardInfo.Id) == vid {
oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
newShardBits := oldShardBits
for _, ecsi := range diskInfo.EcShardInfos {
if needle.VolumeId(ecsi.Id) == vid {
si := erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(ecsi)
oldShardCount := si.Count()
for _, shardId := range shardIds {
newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
si.Set(shardId, 0)
}
shardInfo.EcIndexBits = uint32(newShardBits)
ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
ecsi.EcIndexBits = si.Bitmap()
ecsi.ShardSizes = si.SizesInt64()
ecNode.freeEcSlot -= si.Count() - oldShardCount
foundVolume = true
break
}
@@ -619,34 +619,36 @@ func (ecNode *EcNode) addEcVolumeShards(vid needle.VolumeId, collection string,
}
if !foundVolume {
var newShardBits erasure_coding.ShardBits
for _, shardId := range shardIds {
newShardBits = newShardBits.AddShardId(erasure_coding.ShardId(shardId))
si := erasure_coding.NewShardsInfo()
for _, id := range shardIds {
si.Set(id, 0)
}
diskInfo.EcShardInfos = append(diskInfo.EcShardInfos, &master_pb.VolumeEcShardInformationMessage{
Id: uint32(vid),
Collection: collection,
EcIndexBits: uint32(newShardBits),
EcIndexBits: si.Bitmap(),
ShardSizes: si.SizesInt64(),
DiskType: string(diskType),
})
ecNode.freeEcSlot -= len(shardIds)
ecNode.freeEcSlot -= si.Count()
}
return ecNode
}
func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []uint32, diskType types.DiskType) *EcNode {
func (ecNode *EcNode) deleteEcVolumeShards(vid needle.VolumeId, shardIds []erasure_coding.ShardId, diskType types.DiskType) *EcNode {
if diskInfo, found := ecNode.info.DiskInfos[string(diskType)]; found {
for _, shardInfo := range diskInfo.EcShardInfos {
if needle.VolumeId(shardInfo.Id) == vid {
oldShardBits := erasure_coding.ShardBits(shardInfo.EcIndexBits)
newShardBits := oldShardBits
for _, eci := range diskInfo.EcShardInfos {
if needle.VolumeId(eci.Id) == vid {
si := erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(eci)
oldCount := si.Count()
for _, shardId := range shardIds {
newShardBits = newShardBits.RemoveShardId(erasure_coding.ShardId(shardId))
si.Delete(shardId)
}
shardInfo.EcIndexBits = uint32(newShardBits)
ecNode.freeEcSlot -= newShardBits.ShardIdCount() - oldShardBits.ShardIdCount()
eci.EcIndexBits = si.Bitmap()
eci.ShardSizes = si.SizesInt64()
ecNode.freeEcSlot -= si.Count() - oldCount
}
}
}
@@ -754,8 +756,8 @@ func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.Volum
// Use MaxShardCount (32) to support custom EC ratios
shardToLocations := make([][]*EcNode, erasure_coding.MaxShardCount)
for _, ecNode := range locations {
shardBits := findEcVolumeShards(ecNode, vid, ecb.diskType)
for _, shardId := range shardBits.ShardIds() {
si := findEcVolumeShardsInfo(ecNode, vid, ecb.diskType)
for _, shardId := range si.Ids() {
shardToLocations[shardId] = append(shardToLocations[shardId], ecNode)
}
}
@@ -769,7 +771,7 @@ func (ecb *ecBalancer) doDeduplicateEcShards(collection string, vid needle.Volum
continue
}
duplicatedShardIds := []uint32{uint32(shardId)}
duplicatedShardIds := []erasure_coding.ShardId{erasure_coding.ShardId(shardId)}
for _, ecNode := range ecNodes[1:] {
if err := unmountEcShards(ecb.commandEnv.option.GrpcDialOption, vid, pb.NewServerAddressFromDataNode(ecNode.info), duplicatedShardIds); err != nil {
return err
@@ -799,8 +801,11 @@ func (ecb *ecBalancer) balanceEcShardsAcrossRacks(collection string) error {
func countShardsByRack(vid needle.VolumeId, locations []*EcNode, diskType types.DiskType) map[string]int {
return groupByCount(locations, func(ecNode *EcNode) (id string, count int) {
shardBits := findEcVolumeShards(ecNode, vid, diskType)
return string(ecNode.rack), shardBits.ShardIdCount()
id = string(ecNode.rack)
if si := findEcVolumeShardsInfo(ecNode, vid, diskType); si != nil {
count = si.Count()
}
return
})
}
@@ -809,9 +814,9 @@ func shardsByTypePerRack(vid needle.VolumeId, locations []*EcNode, diskType type
dataPerRack = make(map[string][]erasure_coding.ShardId)
parityPerRack = make(map[string][]erasure_coding.ShardId)
for _, ecNode := range locations {
shardBits := findEcVolumeShards(ecNode, vid, diskType)
si := findEcVolumeShardsInfo(ecNode, vid, diskType)
rackId := string(ecNode.rack)
for _, shardId := range shardBits.ShardIds() {
for _, shardId := range si.Ids() {
if int(shardId) < dataShards {
dataPerRack[rackId] = append(dataPerRack[rackId], shardId)
} else {
@@ -891,8 +896,8 @@ func (ecb *ecBalancer) balanceShardTypeAcrossRacks(
shardId := shards[i]
// Find which node has this shard
for _, ecNode := range ecNodes {
shardBits := findEcVolumeShards(ecNode, vid, ecb.diskType)
if shardBits.HasShardId(shardId) {
si := findEcVolumeShardsInfo(ecNode, vid, ecb.diskType)
if si.Has(shardId) {
shardsToMove[shardId] = ecNode
break
}
@@ -1052,10 +1057,10 @@ func (ecb *ecBalancer) balanceEcShardsWithinRacks(collection string) error {
func (ecb *ecBalancer) doBalanceEcShardsWithinOneRack(averageShardsPerEcNode int, collection string, vid needle.VolumeId, existingLocations, possibleDestinationEcNodes []*EcNode) error {
for _, ecNode := range existingLocations {
shardBits := findEcVolumeShards(ecNode, vid, ecb.diskType)
overLimitCount := shardBits.ShardIdCount() - averageShardsPerEcNode
si := findEcVolumeShardsInfo(ecNode, vid, ecb.diskType)
overLimitCount := si.Count() - averageShardsPerEcNode
for _, shardId := range shardBits.ShardIds() {
for _, shardId := range si.Ids() {
if overLimitCount <= 0 {
break
@@ -1102,7 +1107,7 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
return
}
for _, ecShardInfo := range diskInfo.EcShardInfos {
count += erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIdCount()
count += erasure_coding.ShardsCountFromVolumeEcShardInformationMessage(ecShardInfo)
}
return ecNode.info.Id, count
})
@@ -1132,30 +1137,32 @@ func (ecb *ecBalancer) doBalanceEcRack(ecRack *EcRack) error {
}
if fullDiskInfo, found := fullNode.info.DiskInfos[string(ecb.diskType)]; found {
for _, shards := range fullDiskInfo.EcShardInfos {
if _, found := emptyNodeIds[shards.Id]; !found {
for _, shardId := range erasure_coding.ShardBits(shards.EcIndexBits).ShardIds() {
vid := needle.VolumeId(shards.Id)
// For balancing, strictly require matching disk type
destDiskId := pickBestDiskOnNode(emptyNode, vid, ecb.diskType, true)
if _, found := emptyNodeIds[shards.Id]; found {
continue
}
si := erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(shards)
for _, shardId := range si.Ids() {
vid := needle.VolumeId(shards.Id)
// For balancing, strictly require matching disk type
destDiskId := pickBestDiskOnNode(emptyNode, vid, ecb.diskType, true)
if destDiskId > 0 {
fmt.Printf("%s moves ec shards %d.%d to %s (disk %d)\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id, destDiskId)
} else {
fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
}
err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, vid, shardId, emptyNode, destDiskId, ecb.applyBalancing, ecb.diskType)
if err != nil {
return err
}
ecNodeIdToShardCount[emptyNode.info.Id]++
ecNodeIdToShardCount[fullNode.info.Id]--
hasMove = true
break
if destDiskId > 0 {
fmt.Printf("%s moves ec shards %d.%d to %s (disk %d)\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id, destDiskId)
} else {
fmt.Printf("%s moves ec shards %d.%d to %s\n", fullNode.info.Id, shards.Id, shardId, emptyNode.info.Id)
}
err := moveMountedShardToEcNode(ecb.commandEnv, fullNode, shards.Collection, vid, shardId, emptyNode, destDiskId, ecb.applyBalancing, ecb.diskType)
if err != nil {
return err
}
ecNodeIdToShardCount[emptyNode.info.Id]++
ecNodeIdToShardCount[fullNode.info.Id]--
hasMove = true
break
}
break
}
}
}
@@ -1174,7 +1181,11 @@ func (ecb *ecBalancer) pickEcNodeToBalanceShardsInto(vid needle.VolumeId, existi
nodeShards := map[*EcNode]int{}
for _, node := range possibleDestinations {
nodeShards[node] = findEcVolumeShards(node, vid, ecb.diskType).ShardIdCount()
count := 0
if si := findEcVolumeShardsInfo(node, vid, ecb.diskType); si != nil {
count = si.Count()
}
nodeShards[node] = count
}
targets := []*EcNode{}
@@ -1244,8 +1255,8 @@ func diskDistributionScore(ecNode *EcNode, vid needle.VolumeId) int {
// Lower total means more room for new shards
score := 0
for _, disk := range ecNode.disks {
if shardBits, ok := disk.ecShards[vid]; ok {
score += shardBits.ShardIdCount() * 10 // Weight shards of this volume heavily
if si, ok := disk.ecShards[vid]; ok {
score += si.Count() * 10 // Weight shards of this volume heavily
}
score += disk.ecShardCount // Also consider total shards on disk
}
@@ -1272,8 +1283,8 @@ func pickBestDiskOnNode(ecNode *EcNode, vid needle.VolumeId, diskType types.Disk
// Check if this volume already has shards on this disk
existingShards := 0
if shardBits, ok := disk.ecShards[vid]; ok {
existingShards = shardBits.ShardIdCount()
if si, ok := disk.ecShards[vid]; ok {
existingShards = si.Count()
}
// Score: prefer disks with fewer total shards and fewer shards of this volume
@@ -1333,11 +1344,11 @@ func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int, disk
picked := make(map[erasure_coding.ShardId]*EcNode)
var candidateEcNodes []*CandidateEcNode
for _, ecNode := range ecNodes {
shardBits := findEcVolumeShards(ecNode, vid, diskType)
if shardBits.ShardIdCount() > 0 {
si := findEcVolumeShardsInfo(ecNode, vid, diskType)
if si.Count() > 0 {
candidateEcNodes = append(candidateEcNodes, &CandidateEcNode{
ecNode: ecNode,
shardCount: shardBits.ShardIdCount(),
shardCount: si.Count(),
})
}
}
@@ -1347,13 +1358,13 @@ func pickNEcShardsToMoveFrom(ecNodes []*EcNode, vid needle.VolumeId, n int, disk
for i := 0; i < n; i++ {
selectedEcNodeIndex := -1
for i, candidateEcNode := range candidateEcNodes {
shardBits := findEcVolumeShards(candidateEcNode.ecNode, vid, diskType)
if shardBits > 0 {
si := findEcVolumeShardsInfo(candidateEcNode.ecNode, vid, diskType)
if si.Count() > 0 {
selectedEcNodeIndex = i
for _, shardId := range shardBits.ShardIds() {
for _, shardId := range si.Ids() {
candidateEcNode.shardCount--
picked[shardId] = candidateEcNode.ecNode
candidateEcNode.ecNode.deleteEcVolumeShards(vid, []uint32{uint32(shardId)}, diskType)
candidateEcNode.ecNode.deleteEcVolumeShards(vid, []erasure_coding.ShardId{shardId}, diskType)
break
}
break

View File

@@ -108,12 +108,12 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec
}
// find volume location
nodeToEcIndexBits := collectEcNodeShardBits(topoInfo, vid, diskType)
nodeToEcShardsInfo := collectEcNodeShardsInfo(topoInfo, vid, diskType)
fmt.Printf("ec volume %d shard locations: %+v\n", vid, nodeToEcIndexBits)
fmt.Printf("ec volume %d shard locations: %+v\n", vid, nodeToEcShardsInfo)
// collect ec shards to the server with most space
targetNodeLocation, err := collectEcShards(commandEnv, nodeToEcIndexBits, collection, vid)
targetNodeLocation, err := collectEcShards(commandEnv, nodeToEcShardsInfo, collection, vid)
if err != nil {
return fmt.Errorf("collectEcShards for volume %d: %v", vid, err)
}
@@ -124,13 +124,13 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec
// Special case: if the EC index has no live entries, decoding is a no-op.
// Just purge EC shards and return success without generating/mounting an empty volume.
if isEcDecodeEmptyVolumeErr(err) {
return unmountAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, nodeToEcIndexBits, vid)
return unmountAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, nodeToEcShardsInfo, vid)
}
return fmt.Errorf("generate normal volume %d on %s: %v", vid, targetNodeLocation, err)
}
// delete the previous ec shards
err = mountVolumeAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, targetNodeLocation, nodeToEcIndexBits, vid)
err = mountVolumeAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, targetNodeLocation, nodeToEcShardsInfo, vid)
if err != nil {
return fmt.Errorf("delete ec shards for volume %d: %v", vid, err)
}
@@ -150,24 +150,24 @@ func isEcDecodeEmptyVolumeErr(err error) bool {
return strings.Contains(st.Message(), erasure_coding.EcNoLiveEntriesSubstring)
}
func unmountAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error {
return unmountAndDeleteEcShardsWithPrefix("unmountAndDeleteEcShards", grpcDialOption, collection, nodeToEcIndexBits, vid)
func unmountAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, nodeToShardsInfo map[pb.ServerAddress]*erasure_coding.ShardsInfo, vid needle.VolumeId) error {
return unmountAndDeleteEcShardsWithPrefix("unmountAndDeleteEcShards", grpcDialOption, collection, nodeToShardsInfo, vid)
}
func unmountAndDeleteEcShardsWithPrefix(prefix string, grpcDialOption grpc.DialOption, collection string, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error {
ewg := NewErrorWaitGroup(len(nodeToEcIndexBits))
func unmountAndDeleteEcShardsWithPrefix(prefix string, grpcDialOption grpc.DialOption, collection string, nodeToShardsInfo map[pb.ServerAddress]*erasure_coding.ShardsInfo, vid needle.VolumeId) error {
ewg := NewErrorWaitGroup(len(nodeToShardsInfo))
// unmount and delete ec shards in parallel (one goroutine per location)
for location, ecIndexBits := range nodeToEcIndexBits {
location, ecIndexBits := location, ecIndexBits // capture loop variables for goroutine
for location, si := range nodeToShardsInfo {
location, si := location, si // capture loop variables for goroutine
ewg.Add(func() error {
fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
if err := unmountEcShards(grpcDialOption, vid, location, ecIndexBits.ToUint32Slice()); err != nil {
fmt.Printf("unmount ec volume %d on %s has shards: %+v\n", vid, location, si.Ids())
if err := unmountEcShards(grpcDialOption, vid, location, si.Ids()); err != nil {
return fmt.Errorf("%s unmount ec volume %d on %s: %w", prefix, vid, location, err)
}
fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, ecIndexBits.ShardIds())
if err := sourceServerDeleteEcShards(grpcDialOption, collection, vid, location, ecIndexBits.ToUint32Slice()); err != nil {
fmt.Printf("delete ec volume %d on %s has shards: %+v\n", vid, location, si.Ids())
if err := sourceServerDeleteEcShards(grpcDialOption, collection, vid, location, si.Ids()); err != nil {
return fmt.Errorf("%s delete ec volume %d on %s: %w", prefix, vid, location, err)
}
return nil
@@ -176,7 +176,7 @@ func unmountAndDeleteEcShardsWithPrefix(prefix string, grpcDialOption grpc.DialO
return ewg.Wait()
}
func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, targetNodeLocation pb.ServerAddress, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, vid needle.VolumeId) error {
func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection string, targetNodeLocation pb.ServerAddress, nodeToShardsInfo map[pb.ServerAddress]*erasure_coding.ShardsInfo, vid needle.VolumeId) error {
// mount volume
if err := operation.WithVolumeServerClient(false, targetNodeLocation, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
@@ -188,7 +188,7 @@ func mountVolumeAndDeleteEcShards(grpcDialOption grpc.DialOption, collection str
return fmt.Errorf("mountVolumeAndDeleteEcShards mount volume %d on %s: %v", vid, targetNodeLocation, err)
}
return unmountAndDeleteEcShardsWithPrefix("mountVolumeAndDeleteEcShards", grpcDialOption, collection, nodeToEcIndexBits, vid)
return unmountAndDeleteEcShardsWithPrefix("mountVolumeAndDeleteEcShards", grpcDialOption, collection, nodeToShardsInfo, vid)
}
func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, collection string, sourceVolumeServer pb.ServerAddress) error {
@@ -207,57 +207,57 @@ func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, c
}
func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[pb.ServerAddress]erasure_coding.ShardBits, collection string, vid needle.VolumeId) (targetNodeLocation pb.ServerAddress, err error) {
func collectEcShards(commandEnv *CommandEnv, nodeToShardsInfo map[pb.ServerAddress]*erasure_coding.ShardsInfo, collection string, vid needle.VolumeId) (targetNodeLocation pb.ServerAddress, err error) {
maxShardCount := 0
var existingEcIndexBits erasure_coding.ShardBits
for loc, ecIndexBits := range nodeToEcIndexBits {
toBeCopiedShardCount := ecIndexBits.MinusParityShards().ShardIdCount()
existingShardsInfo := erasure_coding.NewShardsInfo()
for loc, si := range nodeToShardsInfo {
toBeCopiedShardCount := si.MinusParityShards().Count()
if toBeCopiedShardCount > maxShardCount {
maxShardCount = toBeCopiedShardCount
targetNodeLocation = loc
existingEcIndexBits = ecIndexBits
existingShardsInfo = si
}
}
fmt.Printf("collectEcShards: ec volume %d collect shards to %s from: %+v\n", vid, targetNodeLocation, nodeToEcIndexBits)
fmt.Printf("collectEcShards: ec volume %d collect shards to %s from: %+v\n", vid, targetNodeLocation, nodeToShardsInfo)
var copiedEcIndexBits erasure_coding.ShardBits
for loc, ecIndexBits := range nodeToEcIndexBits {
copiedShardsInfo := erasure_coding.NewShardsInfo()
for loc, si := range nodeToShardsInfo {
if loc == targetNodeLocation {
continue
}
needToCopyEcIndexBits := ecIndexBits.Minus(existingEcIndexBits).MinusParityShards()
if needToCopyEcIndexBits.ShardIdCount() == 0 {
needToCopyShardsInfo := si.Minus(existingShardsInfo).MinusParityShards()
if needToCopyShardsInfo.Count() == 0 {
continue
}
err = operation.WithVolumeServerClient(false, targetNodeLocation, commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation)
fmt.Printf("copy %d.%v %s => %s\n", vid, needToCopyShardsInfo.Ids(), loc, targetNodeLocation)
_, copyErr := volumeServerClient.VolumeEcShardsCopy(context.Background(), &volume_server_pb.VolumeEcShardsCopyRequest{
VolumeId: uint32(vid),
Collection: collection,
ShardIds: needToCopyEcIndexBits.ToUint32Slice(),
ShardIds: needToCopyShardsInfo.IdsUint32(),
CopyEcxFile: false,
CopyEcjFile: true,
CopyVifFile: true,
SourceDataNode: string(loc),
})
if copyErr != nil {
return fmt.Errorf("copy %d.%v %s => %s : %v\n", vid, needToCopyEcIndexBits.ShardIds(), loc, targetNodeLocation, copyErr)
return fmt.Errorf("copy %d.%v %s => %s : %v\n", vid, needToCopyShardsInfo.Ids(), loc, targetNodeLocation, copyErr)
}
fmt.Printf("mount %d.%v on %s\n", vid, needToCopyEcIndexBits.ShardIds(), targetNodeLocation)
fmt.Printf("mount %d.%v on %s\n", vid, needToCopyShardsInfo.Ids(), targetNodeLocation)
_, mountErr := volumeServerClient.VolumeEcShardsMount(context.Background(), &volume_server_pb.VolumeEcShardsMountRequest{
VolumeId: uint32(vid),
Collection: collection,
ShardIds: needToCopyEcIndexBits.ToUint32Slice(),
ShardIds: needToCopyShardsInfo.IdsUint32(),
})
if mountErr != nil {
return fmt.Errorf("mount %d.%v on %s : %v\n", vid, needToCopyEcIndexBits.ShardIds(), targetNodeLocation, mountErr)
return fmt.Errorf("mount %d.%v on %s : %v\n", vid, needToCopyShardsInfo.Ids(), targetNodeLocation, mountErr)
}
return nil
@@ -267,14 +267,12 @@ func collectEcShards(commandEnv *CommandEnv, nodeToEcIndexBits map[pb.ServerAddr
break
}
copiedEcIndexBits = copiedEcIndexBits.Plus(needToCopyEcIndexBits)
copiedShardsInfo.Add(needToCopyShardsInfo)
}
nodeToEcIndexBits[targetNodeLocation] = existingEcIndexBits.Plus(copiedEcIndexBits)
nodeToShardsInfo[targetNodeLocation] = existingShardsInfo.Plus(copiedShardsInfo)
return targetNodeLocation, err
}
func lookupVolumeIds(commandEnv *CommandEnv, volumeIds []string) (volumeIdLocations []*master_pb.LookupVolumeResponse_VolumeIdLocation, err error) {
@@ -314,18 +312,17 @@ func collectEcShardIds(topoInfo *master_pb.TopologyInfo, collectionPattern strin
return
}
func collectEcNodeShardBits(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId, diskType types.DiskType) map[pb.ServerAddress]erasure_coding.ShardBits {
nodeToEcIndexBits := make(map[pb.ServerAddress]erasure_coding.ShardBits)
func collectEcNodeShardsInfo(topoInfo *master_pb.TopologyInfo, vid needle.VolumeId, diskType types.DiskType) map[pb.ServerAddress]*erasure_coding.ShardsInfo {
res := make(map[pb.ServerAddress]*erasure_coding.ShardsInfo)
eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
if diskInfo, found := dn.DiskInfos[string(diskType)]; found {
for _, v := range diskInfo.EcShardInfos {
if v.Id == uint32(vid) {
nodeToEcIndexBits[pb.NewServerAddressFromDataNode(dn)] = erasure_coding.ShardBits(v.EcIndexBits)
res[pb.NewServerAddressFromDataNode(dn)] = erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(v)
}
}
}
})
return nodeToEcIndexBits
return res
}

View File

@@ -278,10 +278,7 @@ func doEcEncode(commandEnv *CommandEnv, writer io.Writer, volumeIdToCollection m
}
// mount all ec shards for the converted volume
shardIds := make([]uint32, erasure_coding.TotalShardsCount)
for i := range shardIds {
shardIds[i] = uint32(i)
}
shardIds := erasure_coding.AllShardIds()
ewg.Reset()
for _, vid := range volumeIds {

View File

@@ -149,8 +149,7 @@ func (erb *ecRebuilder) countLocalShards(node *EcNode, collection string, volume
for _, diskInfo := range node.info.DiskInfos {
for _, ecShardInfo := range diskInfo.EcShardInfos {
if ecShardInfo.Collection == collection && needle.VolumeId(ecShardInfo.Id) == volumeId {
shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
return len(shardBits.ShardIds())
return erasure_coding.ShardsCountFromVolumeEcShardInformationMessage(ecShardInfo)
}
}
}
@@ -266,7 +265,7 @@ func (erb *ecRebuilder) rebuildOneEcVolume(collection string, volumeId needle.Vo
fmt.Printf("rebuildOneEcVolume %s %d\n", collection, volumeId)
// collect shard files to rebuilder local disk
var generatedShardIds []uint32
var generatedShardIds []erasure_coding.ShardId
copiedShardIds, _, err := erb.prepareDataToRecover(rebuilder, collection, volumeId, locations)
if err != nil {
return err
@@ -306,7 +305,7 @@ func (erb *ecRebuilder) rebuildOneEcVolume(collection string, volumeId needle.Vo
return nil
}
func (erb *ecRebuilder) generateMissingShards(collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress) (rebuiltShardIds []uint32, err error) {
func (erb *ecRebuilder) generateMissingShards(collection string, volumeId needle.VolumeId, sourceLocation pb.ServerAddress) (rebuiltShardIds []erasure_coding.ShardId, err error) {
err = operation.WithVolumeServerClient(false, sourceLocation, erb.commandEnv.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
resp, rebuildErr := volumeServerClient.VolumeEcShardsRebuild(context.Background(), &volume_server_pb.VolumeEcShardsRebuildRequest{
@@ -314,34 +313,35 @@ func (erb *ecRebuilder) generateMissingShards(collection string, volumeId needle
Collection: collection,
})
if rebuildErr == nil {
rebuiltShardIds = resp.RebuiltShardIds
rebuiltShardIds = erasure_coding.Uint32ToShardIds(resp.RebuiltShardIds)
}
return rebuildErr
})
return
}
func (erb *ecRebuilder) prepareDataToRecover(rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations) (copiedShardIds []uint32, localShardIds []uint32, err error) {
func (erb *ecRebuilder) prepareDataToRecover(rebuilder *EcNode, collection string, volumeId needle.VolumeId, locations EcShardLocations) (copiedShardIds []erasure_coding.ShardId, localShardIds []erasure_coding.ShardId, err error) {
needEcxFile := true
var localShardBits erasure_coding.ShardBits
localShardsInfo := erasure_coding.NewShardsInfo()
for _, diskInfo := range rebuilder.info.DiskInfos {
for _, ecShardInfo := range diskInfo.EcShardInfos {
if ecShardInfo.Collection == collection && needle.VolumeId(ecShardInfo.Id) == volumeId {
needEcxFile = false
localShardBits = erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
localShardsInfo = erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(ecShardInfo)
}
}
}
for shardId, ecNodes := range locations {
for i, ecNodes := range locations {
shardId := erasure_coding.ShardId(i)
if len(ecNodes) == 0 {
erb.write("missing shard %d.%d\n", volumeId, shardId)
continue
}
if localShardBits.HasShardId(erasure_coding.ShardId(shardId)) {
localShardIds = append(localShardIds, uint32(shardId))
if localShardsInfo.Has(shardId) {
localShardIds = append(localShardIds, shardId)
erb.write("use existing shard %d.%d\n", volumeId, shardId)
continue
}
@@ -368,7 +368,7 @@ func (erb *ecRebuilder) prepareDataToRecover(rebuilder *EcNode, collection strin
erb.write("%s failed to copy %d.%d from %s: %v\n", rebuilder.info.Id, volumeId, shardId, ecNodes[0].info.Id, copyErr)
} else {
erb.write("%s copied %d.%d from %s\n", rebuilder.info.Id, volumeId, shardId, ecNodes[0].info.Id)
copiedShardIds = append(copiedShardIds, uint32(shardId))
copiedShardIds = append(copiedShardIds, shardId)
}
}
@@ -394,7 +394,7 @@ func (ecShardMap EcShardMap) registerEcNode(ecNode *EcNode, collection string) {
existing = make([][]*EcNode, erasure_coding.MaxShardCount)
ecShardMap[needle.VolumeId(shardInfo.Id)] = existing
}
for _, shardId := range erasure_coding.ShardBits(shardInfo.EcIndexBits).ShardIds() {
for _, shardId := range erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(shardInfo).Ids() {
existing[shardId] = append(existing[shardId], ecNode)
}
}

View File

@@ -15,9 +15,9 @@ func TestEcShardMapRegister(t *testing.T) {
// Create test nodes with EC shards
node1 := newEcNode("dc1", "rack1", "node1", 100).
addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6})
addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6})
node2 := newEcNode("dc1", "rack1", "node2", 100).
addEcVolumeAndShardsForTest(1, "c1", []uint32{7, 8, 9, 10, 11, 12, 13})
addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{7, 8, 9, 10, 11, 12, 13})
ecShardMap.registerEcNode(node1, "c1")
ecShardMap.registerEcNode(node2, "c1")
@@ -51,15 +51,15 @@ func TestEcShardMapRegister(t *testing.T) {
func TestEcShardMapShardCount(t *testing.T) {
testCases := []struct {
name string
shardIds []uint32
shardIds []erasure_coding.ShardId
expectedCount int
}{
{"all shards", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}, 14},
{"data shards only", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, 10},
{"parity shards only", []uint32{10, 11, 12, 13}, 4},
{"missing some shards", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8}, 9},
{"single shard", []uint32{0}, 1},
{"no shards", []uint32{}, 0},
{"all shards", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}, 14},
{"data shards only", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, 10},
{"parity shards only", []erasure_coding.ShardId{10, 11, 12, 13}, 4},
{"missing some shards", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6, 7, 8}, 9},
{"single shard", []erasure_coding.ShardId{0}, 1},
{"no shards", []erasure_coding.ShardId{}, 0},
}
for _, tc := range testCases {
@@ -85,7 +85,7 @@ func TestRebuildEcVolumesInsufficientShards(t *testing.T) {
// Create a volume with insufficient shards (less than DataShardsCount)
node1 := newEcNode("dc1", "rack1", "node1", 100).
addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4}) // Only 5 shards
addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{0, 1, 2, 3, 4}) // Only 5 shards
erb := &ecRebuilder{
commandEnv: &CommandEnv{
@@ -114,7 +114,7 @@ func TestRebuildEcVolumesCompleteVolume(t *testing.T) {
// Create a volume with all shards
node1 := newEcNode("dc1", "rack1", "node1", 100).
addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13})
addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13})
erb := &ecRebuilder{
commandEnv: &CommandEnv{
@@ -146,7 +146,7 @@ func TestRebuildEcVolumesInsufficientSpace(t *testing.T) {
// Node has 10 local shards, missing 4 shards (10,11,12,13), so needs 4 free slots
// Set free slots to 3 (insufficient)
node1 := newEcNode("dc1", "rack1", "node1", 3). // Only 3 free slots, need 4
addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
erb := &ecRebuilder{
commandEnv: &CommandEnv{
@@ -180,11 +180,11 @@ func TestMultipleNodesWithShards(t *testing.T) {
// Create 3 nodes with different shards
node1 := newEcNode("dc1", "rack1", "node1", 100).
addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3})
addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{0, 1, 2, 3})
node2 := newEcNode("dc1", "rack1", "node2", 100).
addEcVolumeAndShardsForTest(1, "c1", []uint32{4, 5, 6, 7})
addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{4, 5, 6, 7})
node3 := newEcNode("dc1", "rack1", "node3", 100).
addEcVolumeAndShardsForTest(1, "c1", []uint32{8, 9})
addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{8, 9})
ecShardMap.registerEcNode(node1, "c1")
ecShardMap.registerEcNode(node2, "c1")
@@ -222,9 +222,9 @@ func TestDuplicateShards(t *testing.T) {
// Create 2 nodes with overlapping shards (both have shard 0)
node1 := newEcNode("dc1", "rack1", "node1", 100).
addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3})
addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{0, 1, 2, 3})
node2 := newEcNode("dc1", "rack1", "node2", 100).
addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 4, 5, 6}) // Duplicate shard 0
addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{0, 4, 5, 6}) // Duplicate shard 0
ecShardMap.registerEcNode(node1, "c1")
ecShardMap.registerEcNode(node2, "c1")

View File

@@ -12,8 +12,8 @@ import (
func TestCommandEcBalanceSmall(t *testing.T) {
ecb := &ecBalancer{
ecNodes: []*EcNode{
newEcNode("dc1", "rack1", "dn1", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}),
newEcNode("dc1", "rack2", "dn2", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}),
newEcNode("dc1", "rack1", "dn1", 100).addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}),
newEcNode("dc1", "rack2", "dn2", 100).addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}),
},
applyBalancing: false,
diskType: types.HardDriveType,
@@ -26,11 +26,11 @@ func TestCommandEcBalanceNothingToMove(t *testing.T) {
ecb := &ecBalancer{
ecNodes: []*EcNode{
newEcNode("dc1", "rack1", "dn1", 100).
addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}).
addEcVolumeAndShardsForTest(2, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}),
addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6}).
addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{7, 8, 9, 10, 11, 12, 13}),
newEcNode("dc1", "rack1", "dn2", 100).
addEcVolumeAndShardsForTest(1, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}).
addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}),
addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{7, 8, 9, 10, 11, 12, 13}).
addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6}),
},
applyBalancing: false,
diskType: types.HardDriveType,
@@ -43,11 +43,11 @@ func TestCommandEcBalanceAddNewServers(t *testing.T) {
ecb := &ecBalancer{
ecNodes: []*EcNode{
newEcNode("dc1", "rack1", "dn1", 100).
addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}).
addEcVolumeAndShardsForTest(2, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}),
addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6}).
addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{7, 8, 9, 10, 11, 12, 13}),
newEcNode("dc1", "rack1", "dn2", 100).
addEcVolumeAndShardsForTest(1, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}).
addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}),
addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{7, 8, 9, 10, 11, 12, 13}).
addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6}),
newEcNode("dc1", "rack1", "dn3", 100),
newEcNode("dc1", "rack1", "dn4", 100),
},
@@ -62,11 +62,11 @@ func TestCommandEcBalanceAddNewRacks(t *testing.T) {
ecb := &ecBalancer{
ecNodes: []*EcNode{
newEcNode("dc1", "rack1", "dn1", 100).
addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}).
addEcVolumeAndShardsForTest(2, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}),
addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6}).
addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{7, 8, 9, 10, 11, 12, 13}),
newEcNode("dc1", "rack1", "dn2", 100).
addEcVolumeAndShardsForTest(1, "c1", []uint32{7, 8, 9, 10, 11, 12, 13}).
addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6}),
addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{7, 8, 9, 10, 11, 12, 13}).
addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6}),
newEcNode("dc1", "rack2", "dn3", 100),
newEcNode("dc1", "rack2", "dn4", 100),
},
@@ -81,36 +81,36 @@ func TestCommandEcBalanceVolumeEvenButRackUneven(t *testing.T) {
ecb := ecBalancer{
ecNodes: []*EcNode{
newEcNode("dc1", "rack1", "dn_shared", 100).
addEcVolumeAndShardsForTest(1, "c1", []uint32{0}).
addEcVolumeAndShardsForTest(2, "c1", []uint32{0}),
addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{0}).
addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{0}),
newEcNode("dc1", "rack1", "dn_a1", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{1}),
newEcNode("dc1", "rack1", "dn_a2", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{2}),
newEcNode("dc1", "rack1", "dn_a3", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{3}),
newEcNode("dc1", "rack1", "dn_a4", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{4}),
newEcNode("dc1", "rack1", "dn_a5", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{5}),
newEcNode("dc1", "rack1", "dn_a6", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{6}),
newEcNode("dc1", "rack1", "dn_a7", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{7}),
newEcNode("dc1", "rack1", "dn_a8", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{8}),
newEcNode("dc1", "rack1", "dn_a9", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{9}),
newEcNode("dc1", "rack1", "dn_a10", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{10}),
newEcNode("dc1", "rack1", "dn_a11", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{11}),
newEcNode("dc1", "rack1", "dn_a12", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{12}),
newEcNode("dc1", "rack1", "dn_a13", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{13}),
newEcNode("dc1", "rack1", "dn_a1", 100).addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{1}),
newEcNode("dc1", "rack1", "dn_a2", 100).addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{2}),
newEcNode("dc1", "rack1", "dn_a3", 100).addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{3}),
newEcNode("dc1", "rack1", "dn_a4", 100).addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{4}),
newEcNode("dc1", "rack1", "dn_a5", 100).addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{5}),
newEcNode("dc1", "rack1", "dn_a6", 100).addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{6}),
newEcNode("dc1", "rack1", "dn_a7", 100).addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{7}),
newEcNode("dc1", "rack1", "dn_a8", 100).addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{8}),
newEcNode("dc1", "rack1", "dn_a9", 100).addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{9}),
newEcNode("dc1", "rack1", "dn_a10", 100).addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{10}),
newEcNode("dc1", "rack1", "dn_a11", 100).addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{11}),
newEcNode("dc1", "rack1", "dn_a12", 100).addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{12}),
newEcNode("dc1", "rack1", "dn_a13", 100).addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{13}),
newEcNode("dc1", "rack1", "dn_b1", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{1}),
newEcNode("dc1", "rack1", "dn_b2", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{2}),
newEcNode("dc1", "rack1", "dn_b3", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{3}),
newEcNode("dc1", "rack1", "dn_b4", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{4}),
newEcNode("dc1", "rack1", "dn_b5", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{5}),
newEcNode("dc1", "rack1", "dn_b6", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{6}),
newEcNode("dc1", "rack1", "dn_b7", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{7}),
newEcNode("dc1", "rack1", "dn_b8", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{8}),
newEcNode("dc1", "rack1", "dn_b9", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{9}),
newEcNode("dc1", "rack1", "dn_b10", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{10}),
newEcNode("dc1", "rack1", "dn_b11", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{11}),
newEcNode("dc1", "rack1", "dn_b12", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{12}),
newEcNode("dc1", "rack1", "dn_b13", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{13}),
newEcNode("dc1", "rack1", "dn_b1", 100).addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{1}),
newEcNode("dc1", "rack1", "dn_b2", 100).addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{2}),
newEcNode("dc1", "rack1", "dn_b3", 100).addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{3}),
newEcNode("dc1", "rack1", "dn_b4", 100).addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{4}),
newEcNode("dc1", "rack1", "dn_b5", 100).addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{5}),
newEcNode("dc1", "rack1", "dn_b6", 100).addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{6}),
newEcNode("dc1", "rack1", "dn_b7", 100).addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{7}),
newEcNode("dc1", "rack1", "dn_b8", 100).addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{8}),
newEcNode("dc1", "rack1", "dn_b9", 100).addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{9}),
newEcNode("dc1", "rack1", "dn_b10", 100).addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{10}),
newEcNode("dc1", "rack1", "dn_b11", 100).addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{11}),
newEcNode("dc1", "rack1", "dn_b12", 100).addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{12}),
newEcNode("dc1", "rack1", "dn_b13", 100).addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{13}),
newEcNode("dc1", "rack1", "dn3", 100),
},
@@ -134,7 +134,7 @@ func newEcNode(dc string, rack string, dataNodeId string, freeEcSlot int) *EcNod
}
}
func (ecNode *EcNode) addEcVolumeAndShardsForTest(vid uint32, collection string, shardIds []uint32) *EcNode {
func (ecNode *EcNode) addEcVolumeAndShardsForTest(vid uint32, collection string, shardIds []erasure_coding.ShardId) *EcNode {
return ecNode.addEcVolumeShards(needle.VolumeId(vid), collection, shardIds, types.HardDriveType)
}
@@ -146,7 +146,7 @@ func TestCommandEcBalanceEvenDataAndParityDistribution(t *testing.T) {
ecb := &ecBalancer{
ecNodes: []*EcNode{
// All shards initially on rack1/dn1
newEcNode("dc1", "rack1", "dn1", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}),
newEcNode("dc1", "rack1", "dn1", 100).addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}),
// Empty nodes on other racks
newEcNode("dc1", "rack2", "dn2", 100),
newEcNode("dc1", "rack3", "dn3", 100),
@@ -163,7 +163,7 @@ func TestCommandEcBalanceEvenDataAndParityDistribution(t *testing.T) {
// After balancing (dry-run), verify the PLANNED distribution by checking what moves were proposed
// The ecb.ecNodes state is updated during dry-run to track planned moves
vid := needle.VolumeId(1)
dataShardCount := erasure_coding.DataShardsCount // 10
dataShardCount := erasure_coding.DataShardsCount // 10
parityShardCount := erasure_coding.ParityShardsCount // 4
// Count data and parity shards per rack based on current (updated) state
@@ -172,7 +172,7 @@ func TestCommandEcBalanceEvenDataAndParityDistribution(t *testing.T) {
// With 6 racks:
// - Data shards (10): max 2 per rack (ceil(10/6) = 2)
// - Parity shards (4): max 1 per rack (ceil(4/6) = 1)
maxDataPerRack := ceilDivide(dataShardCount, 6) // 2
maxDataPerRack := ceilDivide(dataShardCount, 6) // 2
maxParityPerRack := ceilDivide(parityShardCount, 6) // 1
// Verify no rack has more than max data shards
@@ -229,8 +229,8 @@ func countDataAndParityShardsPerRack(ecNodes []*EcNode, vid needle.VolumeId, dat
parityPerRack = make(map[string]int)
for _, ecNode := range ecNodes {
shardBits := findEcVolumeShards(ecNode, vid, types.HardDriveType)
for _, shardId := range shardBits.ShardIds() {
si := findEcVolumeShardsInfo(ecNode, vid, types.HardDriveType)
for _, shardId := range si.Ids() {
rackId := string(ecNode.rack)
if int(shardId) < dataShardCount {
dataPerRack[rackId]++
@@ -249,9 +249,9 @@ func TestCommandEcBalanceMultipleVolumesEvenDistribution(t *testing.T) {
ecb := &ecBalancer{
ecNodes: []*EcNode{
// Volume 1: all shards on rack1
newEcNode("dc1", "rack1", "dn1", 100).addEcVolumeAndShardsForTest(1, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}),
newEcNode("dc1", "rack1", "dn1", 100).addEcVolumeAndShardsForTest(1, "c1", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}),
// Volume 2: all shards on rack2
newEcNode("dc1", "rack2", "dn2", 100).addEcVolumeAndShardsForTest(2, "c1", []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}),
newEcNode("dc1", "rack2", "dn2", 100).addEcVolumeAndShardsForTest(2, "c1", []erasure_coding.ShardId{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}),
// Empty nodes on other racks
newEcNode("dc1", "rack3", "dn3", 100),
newEcNode("dc1", "rack4", "dn4", 100),

View File

@@ -255,7 +255,7 @@ func capacityByMaxVolumeCount(diskType types.DiskType) CapacityFunc {
}
var ecShardCount int
for _, ecShardInfo := range diskInfo.EcShardInfos {
ecShardCount += erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIdCount()
ecShardCount += erasure_coding.ShardsCountFromVolumeEcShardInformationMessage(ecShardInfo)
}
return float64(diskInfo.MaxVolumeCount) - float64(ecShardCount)/erasure_coding.DataShardsCount
}
@@ -269,7 +269,7 @@ func capacityByFreeVolumeCount(diskType types.DiskType) CapacityFunc {
}
var ecShardCount int
for _, ecShardInfo := range diskInfo.EcShardInfos {
ecShardCount += erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIdCount()
ecShardCount += erasure_coding.ShardsCountFromVolumeEcShardInformationMessage(ecShardInfo)
}
return float64(diskInfo.MaxVolumeCount-diskInfo.VolumeCount) - float64(ecShardCount)/erasure_coding.DataShardsCount
}

View File

@@ -251,25 +251,22 @@ func (c *commandVolumeList) writeDiskInfo(writer io.Writer, t *master_pb.DiskInf
}
// Build shard size information
shardIds := erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds()
si := erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(ecShardInfo)
var totalSize int64
var shardSizeInfo string
if len(ecShardInfo.ShardSizes) > 0 {
var shardDetails []string
for _, shardId := range shardIds {
if size, found := erasure_coding.GetShardSize(ecShardInfo, erasure_coding.ShardId(shardId)); found {
shardDetails = append(shardDetails, fmt.Sprintf("%d:%s", shardId, util.BytesToHumanReadable(uint64(size))))
totalSize += size
} else {
shardDetails = append(shardDetails, fmt.Sprintf("%d:?", shardId))
}
for _, shardId := range si.Ids() {
size := int64(si.Size(shardId))
shardDetails = append(shardDetails, fmt.Sprintf("%d:%s", shardId, util.BytesToHumanReadable(uint64(size))))
totalSize += size
}
shardSizeInfo = fmt.Sprintf(" sizes:[%s] total:%s", strings.Join(shardDetails, " "), util.BytesToHumanReadable(uint64(totalSize)))
}
output(verbosityLevel >= 5, writer, " ec volume id:%v collection:%v shards:%v%s %s\n",
ecShardInfo.Id, ecShardInfo.Collection, shardIds, shardSizeInfo, expireAtString)
ecShardInfo.Id, ecShardInfo.Collection, si.Ids(), shardSizeInfo, expireAtString)
}
output((volumeInfosFound || ecShardInfoFound) && verbosityLevel >= 4, writer, " Disk %s %+v \n", diskType, s)
return s

View File

@@ -108,15 +108,17 @@ func parseOutput(output string) *master_pb.TopologyInfo {
if strings.HasPrefix(part, "collection:") {
ecShard.Collection = part[len("collection:"):]
}
// TODO: we need to parse EC shard sizes as well
if strings.HasPrefix(part, "shards:") {
shards := part[len("shards:["):]
shards = strings.TrimRight(shards, "]")
shardBits := erasure_coding.ShardBits(0)
shardsInfo := erasure_coding.NewShardsInfo()
for _, shardId := range strings.Split(shards, ",") {
sid, _ := strconv.Atoi(shardId)
shardBits = shardBits.AddShardId(erasure_coding.ShardId(sid))
shardsInfo.Set(erasure_coding.ShardId(sid), 0)
}
ecShard.EcIndexBits = uint32(shardBits)
ecShard.EcIndexBits = shardsInfo.Bitmap()
ecShard.ShardSizes = shardsInfo.SizesInt64()
}
}
disk.EcShardInfos = append(disk.EcShardInfos, ecShard)

View File

@@ -197,8 +197,8 @@ func (c *commandVolumeServerEvacuate) evacuateEcVolumes(commandEnv *CommandEnv,
}
func (c *commandVolumeServerEvacuate) moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEcShardInformationMessage, thisNode *EcNode, otherNodes []*EcNode, applyChange bool, diskType types.DiskType, writer io.Writer) (hasMoved bool, err error) {
for _, shardId := range erasure_coding.ShardBits(ecShardInfo.EcIndexBits).ShardIds() {
si := erasure_coding.ShardsInfoFromVolumeEcShardInformationMessage(ecShardInfo)
for _, shardId := range si.Ids() {
// Sort by: 1) fewest shards of this volume, 2) most free EC slots
// This ensures we prefer nodes with capacity and balanced shard distribution
slices.SortFunc(otherNodes, func(a, b *EcNode) int {

View File

@@ -43,7 +43,7 @@ func NewTopologyDistributionAnalysis() *TopologyDistributionAnalysis {
}
// AddNode adds a node and its shards to the analysis
func (a *TopologyDistributionAnalysis) AddNode(node *EcNode, shardBits erasure_coding.ShardBits) {
func (a *TopologyDistributionAnalysis) AddNode(node *EcNode, shardsInfo *erasure_coding.ShardsInfo) {
nodeId := node.info.Id
// Create distribution.TopologyNode from EcNode
@@ -52,18 +52,15 @@ func (a *TopologyDistributionAnalysis) AddNode(node *EcNode, shardBits erasure_c
DataCenter: string(node.dc),
Rack: string(node.rack),
FreeSlots: node.freeEcSlot,
TotalShards: shardBits.ShardIdCount(),
}
for _, shardId := range shardBits.ShardIds() {
topoNode.ShardIDs = append(topoNode.ShardIDs, int(shardId))
TotalShards: shardsInfo.Count(),
ShardIDs: shardsInfo.IdsInt(),
}
a.inner.AddNode(topoNode)
a.nodeMap[nodeId] = node
// Add shard locations
for _, shardId := range shardBits.ShardIds() {
for _, shardId := range shardsInfo.Ids() {
a.inner.AddShardLocation(distribution.ShardLocation{
ShardID: int(shardId),
NodeID: nodeId,
@@ -120,9 +117,9 @@ func AnalyzeVolumeDistribution(volumeId needle.VolumeId, locations []*EcNode, di
analysis := NewTopologyDistributionAnalysis()
for _, node := range locations {
shardBits := findEcVolumeShards(node, volumeId, diskType)
if shardBits.ShardIdCount() > 0 {
analysis.AddNode(node, shardBits)
si := findEcVolumeShardsInfo(node, volumeId, diskType)
if si.Count() > 0 {
analysis.AddNode(node, si)
}
}
@@ -207,8 +204,8 @@ func (r *ProportionalECRebalancer) PlanMoves(
// Add shard locations from nodes that have shards
for _, node := range locations {
nodeId := node.info.Id
shardBits := findEcVolumeShards(node, volumeId, r.diskType)
for _, shardId := range shardBits.ShardIds() {
si := findEcVolumeShardsInfo(node, volumeId, r.diskType)
for _, shardId := range si.Ids() {
analysis.AddShardLocation(distribution.ShardLocation{
ShardID: int(shardId),
NodeID: nodeId,

View File

@@ -44,7 +44,7 @@ func TestECRebalanceWithLimitedSlots(t *testing.T) {
shardCount := 0
for _, diskInfo := range node.info.DiskInfos {
for _, ecShard := range diskInfo.EcShardInfos {
shardCount += erasure_coding.ShardBits(ecShard.EcIndexBits).ShardIdCount()
shardCount += erasure_coding.ShardsCountFromVolumeEcShardInformationMessage(ecShard)
}
}
t.Logf(" Node %s (rack %s): %d shards, %d free slots",
@@ -56,7 +56,7 @@ func TestECRebalanceWithLimitedSlots(t *testing.T) {
for _, node := range ecNodes {
for _, diskInfo := range node.info.DiskInfos {
for _, ecShard := range diskInfo.EcShardInfos {
totalEcShards += erasure_coding.ShardBits(ecShard.EcIndexBits).ShardIdCount()
totalEcShards += erasure_coding.ShardsCountFromVolumeEcShardInformationMessage(ecShard)
}
}
}
@@ -122,7 +122,7 @@ func TestECRebalanceZeroFreeSlots(t *testing.T) {
shardCount := 0
for _, diskInfo := range node.info.DiskInfos {
for _, ecShard := range diskInfo.EcShardInfos {
shardCount += erasure_coding.ShardBits(ecShard.EcIndexBits).ShardIdCount()
shardCount += erasure_coding.ShardsCountFromVolumeEcShardInformationMessage(ecShard)
}
}
t.Logf(" Node %s: %d shards, %d free slots, volumeCount=%d, max=%d",
@@ -226,14 +226,15 @@ func buildZeroFreeSlotTopology() *master_pb.TopologyInfo {
func buildEcShards(volumeIds []uint32) []*master_pb.VolumeEcShardInformationMessage {
var shards []*master_pb.VolumeEcShardInformationMessage
for _, vid := range volumeIds {
allShardBits := erasure_coding.ShardBits(0)
for i := 0; i < erasure_coding.TotalShardsCount; i++ {
allShardBits = allShardBits.AddShardId(erasure_coding.ShardId(i))
si := erasure_coding.NewShardsInfo()
for _, id := range erasure_coding.AllShardIds() {
si.Set(id, 1234)
}
shards = append(shards, &master_pb.VolumeEcShardInformationMessage{
Id: vid,
Collection: "ectest",
EcIndexBits: uint32(allShardBits),
EcIndexBits: si.Bitmap(),
ShardSizes: si.SizesInt64(),
})
}
return shards