Use master shard sizes for EC volumes (#8423)
* Use master shard sizes for EC volumes * Remove EC volume shard size fallback * Remove unused EC dash imports
This commit is contained in:
@@ -6,10 +6,7 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
||||||
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
|
||||||
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -333,10 +330,19 @@ func (s *AdminServer) GetClusterEcVolumes(page int, pageSize int, sortBy string,
|
|||||||
|
|
||||||
// Process each shard this server has for this volume
|
// Process each shard this server has for this volume
|
||||||
shardBits := ecShardInfo.EcIndexBits
|
shardBits := ecShardInfo.EcIndexBits
|
||||||
|
shardSizes := ecShardInfo.ShardSizes
|
||||||
|
sizeIndex := 0
|
||||||
for shardId := 0; shardId < erasure_coding.MaxShardCount; shardId++ {
|
for shardId := 0; shardId < erasure_coding.MaxShardCount; shardId++ {
|
||||||
if (shardBits & (1 << uint(shardId))) != 0 {
|
if (shardBits & (1 << uint(shardId))) != 0 {
|
||||||
// Record shard location
|
// Record shard location
|
||||||
volume.ShardLocations[shardId] = node.Id
|
volume.ShardLocations[shardId] = node.Id
|
||||||
|
if sizeIndex < len(shardSizes) {
|
||||||
|
size := shardSizes[sizeIndex]
|
||||||
|
if size >= 0 {
|
||||||
|
volume.ShardSizes[shardId] = size
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sizeIndex++
|
||||||
totalShards++
|
totalShards++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -354,38 +360,6 @@ func (s *AdminServer) GetClusterEcVolumes(page int, pageSize int, sortBy string,
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Collect shard size information from volume servers
|
|
||||||
for volumeId, volume := range volumeData {
|
|
||||||
// Group servers by volume to minimize gRPC calls
|
|
||||||
serverHasVolume := make(map[string]bool)
|
|
||||||
for _, server := range volume.Servers {
|
|
||||||
serverHasVolume[server] = true
|
|
||||||
}
|
|
||||||
|
|
||||||
// Query each server for shard sizes
|
|
||||||
for server := range serverHasVolume {
|
|
||||||
err := s.WithVolumeServerClient(pb.ServerAddress(server), func(client volume_server_pb.VolumeServerClient) error {
|
|
||||||
resp, err := client.VolumeEcShardsInfo(context.Background(), &volume_server_pb.VolumeEcShardsInfoRequest{
|
|
||||||
VolumeId: volumeId,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
glog.V(1).Infof("Failed to get EC shard info from %s for volume %d: %v", server, volumeId, err)
|
|
||||||
return nil // Continue with other servers, don't fail the entire request
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update shard sizes
|
|
||||||
for _, shardInfo := range resp.EcShardInfos {
|
|
||||||
volume.ShardSizes[int(shardInfo.ShardId)] = shardInfo.Size
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
glog.V(1).Infof("Failed to connect to volume server %s: %v", server, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Calculate completeness for each volume
|
// Calculate completeness for each volume
|
||||||
completeVolumes := 0
|
completeVolumes := 0
|
||||||
incompleteVolumes := 0
|
incompleteVolumes := 0
|
||||||
@@ -616,13 +590,23 @@ func (s *AdminServer) GetEcVolumeDetails(volumeID uint32, sortBy string, sortOrd
|
|||||||
|
|
||||||
// Create individual shard entries for each shard this server has
|
// Create individual shard entries for each shard this server has
|
||||||
shardBits := ecShardInfo.EcIndexBits
|
shardBits := ecShardInfo.EcIndexBits
|
||||||
|
shardSizes := ecShardInfo.ShardSizes
|
||||||
|
sizeIndex := 0
|
||||||
for shardId := 0; shardId < erasure_coding.MaxShardCount; shardId++ {
|
for shardId := 0; shardId < erasure_coding.MaxShardCount; shardId++ {
|
||||||
if (shardBits & (1 << uint(shardId))) != 0 {
|
if (shardBits & (1 << uint(shardId))) != 0 {
|
||||||
|
var shardSize uint64
|
||||||
|
if sizeIndex < len(shardSizes) {
|
||||||
|
size := shardSizes[sizeIndex]
|
||||||
|
if size >= 0 {
|
||||||
|
shardSize = uint64(size)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sizeIndex++
|
||||||
ecShard := EcShardWithInfo{
|
ecShard := EcShardWithInfo{
|
||||||
VolumeID: ecShardInfo.Id,
|
VolumeID: ecShardInfo.Id,
|
||||||
ShardID: uint32(shardId),
|
ShardID: uint32(shardId),
|
||||||
Collection: ecShardInfo.Collection,
|
Collection: ecShardInfo.Collection,
|
||||||
Size: 0, // EC shards don't have individual size in the API response
|
Size: shardSize,
|
||||||
Server: node.Id,
|
Server: node.Id,
|
||||||
DataCenter: dc.Id,
|
DataCenter: dc.Id,
|
||||||
Rack: rack.Id,
|
Rack: rack.Id,
|
||||||
@@ -653,46 +637,6 @@ func (s *AdminServer) GetEcVolumeDetails(volumeID uint32, sortBy string, sortOrd
|
|||||||
return nil, fmt.Errorf("EC volume %d not found", volumeID)
|
return nil, fmt.Errorf("EC volume %d not found", volumeID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Collect shard size information from volume servers
|
|
||||||
shardSizeMap := make(map[string]map[uint32]uint64) // server -> shardId -> size
|
|
||||||
for _, shard := range shards {
|
|
||||||
server := shard.Server
|
|
||||||
if _, exists := shardSizeMap[server]; !exists {
|
|
||||||
// Query this server for shard sizes
|
|
||||||
err := s.WithVolumeServerClient(pb.ServerAddress(server), func(client volume_server_pb.VolumeServerClient) error {
|
|
||||||
resp, err := client.VolumeEcShardsInfo(context.Background(), &volume_server_pb.VolumeEcShardsInfoRequest{
|
|
||||||
VolumeId: volumeID,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
glog.V(1).Infof("Failed to get EC shard info from %s for volume %d: %v", server, volumeID, err)
|
|
||||||
return nil // Continue with other servers, don't fail the entire request
|
|
||||||
}
|
|
||||||
|
|
||||||
// Store shard sizes for this server
|
|
||||||
shardSizeMap[server] = make(map[uint32]uint64)
|
|
||||||
for _, shardInfo := range resp.EcShardInfos {
|
|
||||||
shardSizeMap[server][shardInfo.ShardId] = uint64(shardInfo.Size)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
glog.V(1).Infof("Failed to connect to volume server %s: %v", server, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update shard sizes in the shards array
|
|
||||||
for i := range shards {
|
|
||||||
server := shards[i].Server
|
|
||||||
shardId := shards[i].ShardID
|
|
||||||
if serverSizes, exists := shardSizeMap[server]; exists {
|
|
||||||
if size, exists := serverSizes[shardId]; exists {
|
|
||||||
shards[i].Size = size
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Calculate completeness based on unique shard IDs
|
// Calculate completeness based on unique shard IDs
|
||||||
foundShards := make(map[int]bool)
|
foundShards := make(map[int]bool)
|
||||||
for _, shard := range shards {
|
for _, shard := range shards {
|
||||||
|
|||||||
Reference in New Issue
Block a user