Admin UI: include ec shard sizes into volume server info (#7071)

* show ec shards on dashboard, show max in its own column

* master collect shard size info

* master send shard size via VolumeList

* change to more efficient shard sizes slice

* include ec shard sizes into volume server info

* Eliminated Redundant gRPC Calls

* much more efficient

* Efficient Counting: bits.OnesCount32() uses CPU-optimized instructions to count set bits in O(1)

* avoid extra volume list call

* simplify

* preserve existing shard sizes

* avoid hard coded value

* Update weed/storage/erasure_coding/ec_volume_info.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update weed/admin/dash/volume_management.go

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update ec_volume_info.go

* address comments

* avoid duplicated functions

* Update weed/admin/dash/volume_management.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* simplify

* refactoring

* fix compilation

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
This commit is contained in:
Chris Lu
2025-08-02 02:16:49 -07:00
committed by GitHub
parent 3d4e8409a5
commit 9d013ea9b8
19 changed files with 1144 additions and 319 deletions

View File

@@ -23,6 +23,10 @@ type AdminData struct {
MessageBrokers []MessageBrokerNode `json:"message_brokers"`
DataCenters []DataCenter `json:"datacenters"`
LastUpdated time.Time `json:"last_updated"`
// EC shard totals for dashboard
TotalEcVolumes int `json:"total_ec_volumes"` // Total number of EC volumes across all servers
TotalEcShards int `json:"total_ec_shards"` // Total number of EC shards across all servers
}
// Object Store Users management structures
@@ -98,6 +102,13 @@ func (s *AdminServer) GetAdminData(username string) (AdminData, error) {
return AdminData{}, err
}
// Get volume servers data with EC shard information
volumeServersData, err := s.GetClusterVolumeServers()
if err != nil {
glog.Errorf("Failed to get cluster volume servers: %v", err)
return AdminData{}, err
}
// Get master nodes status
masterNodes := s.getMasterNodesStatus()
@@ -122,6 +133,19 @@ func (s *AdminServer) GetAdminData(username string) (AdminData, error) {
// Keep default value on error
}
// Calculate EC shard totals
var totalEcVolumes, totalEcShards int
ecVolumeSet := make(map[uint32]bool) // To avoid counting the same EC volume multiple times
for _, vs := range volumeServersData.VolumeServers {
totalEcShards += vs.EcShards
// Count unique EC volumes across all servers
for _, ecInfo := range vs.EcShardDetails {
ecVolumeSet[ecInfo.VolumeID] = true
}
}
totalEcVolumes = len(ecVolumeSet)
// Prepare admin data
adminData := AdminData{
Username: username,
@@ -130,11 +154,13 @@ func (s *AdminServer) GetAdminData(username string) (AdminData, error) {
TotalSize: topology.TotalSize,
VolumeSizeLimitMB: volumeSizeLimitMB,
MasterNodes: masterNodes,
VolumeServers: topology.VolumeServers,
VolumeServers: volumeServersData.VolumeServers,
FilerNodes: filerNodes,
MessageBrokers: messageBrokers,
DataCenters: topology.DataCenters,
LastUpdated: topology.UpdatedAt,
TotalEcVolumes: totalEcVolumes,
TotalEcShards: totalEcShards,
}
return adminData, nil

View File

@@ -76,6 +76,13 @@ func (s *AdminServer) getTopologyViaGRPC(topology *ClusterTopology) error {
totalSize += int64(volInfo.Size)
totalFiles += int64(volInfo.FileCount)
}
// Sum up EC shard sizes
for _, ecShardInfo := range diskInfo.EcShardInfos {
for _, shardSize := range ecShardInfo.ShardSizes {
totalSize += shardSize
}
}
}
vs := VolumeServer{

View File

@@ -44,6 +44,22 @@ type VolumeServer struct {
DiskUsage int64 `json:"disk_usage"`
DiskCapacity int64 `json:"disk_capacity"`
LastHeartbeat time.Time `json:"last_heartbeat"`
// EC shard information
EcVolumes int `json:"ec_volumes"` // Number of EC volumes this server has shards for
EcShards int `json:"ec_shards"` // Total number of EC shards on this server
EcShardDetails []VolumeServerEcInfo `json:"ec_shard_details"` // Detailed EC shard information
}
// VolumeServerEcInfo represents EC shard information for a specific volume on a server
type VolumeServerEcInfo struct {
VolumeID uint32 `json:"volume_id"`
Collection string `json:"collection"`
ShardCount int `json:"shard_count"` // Number of shards this server has for this volume
EcIndexBits uint32 `json:"ec_index_bits"` // Bitmap of which shards this server has
ShardNumbers []int `json:"shard_numbers"` // List of actual shard numbers this server has
ShardSizes map[int]int64 `json:"shard_sizes"` // Map from shard number to size in bytes
TotalSize int64 `json:"total_size"` // Total size of all shards on this server for this volume
}
// S3 Bucket management structures

View File

@@ -7,6 +7,7 @@ import (
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
)
// GetClusterVolumes retrieves cluster volumes data with pagination, sorting, and filtering
@@ -26,6 +27,7 @@ func (s *AdminServer) GetClusterVolumes(page int, pageSize int, sortBy string, s
}
var volumes []VolumeWithTopology
var totalSize int64
var cachedTopologyInfo *master_pb.TopologyInfo
// Get detailed volume information via gRPC
err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
@@ -34,11 +36,15 @@ func (s *AdminServer) GetClusterVolumes(page int, pageSize int, sortBy string, s
return err
}
// Cache the topology info for reuse
cachedTopologyInfo = resp.TopologyInfo
if resp.TopologyInfo != nil {
for _, dc := range resp.TopologyInfo.DataCenterInfos {
for _, rack := range dc.RackInfos {
for _, node := range rack.DataNodeInfos {
for _, diskInfo := range node.DiskInfos {
// Process regular volumes
for _, volInfo := range diskInfo.VolumeInfos {
volume := VolumeWithTopology{
VolumeInformationMessage: volInfo,
@@ -49,6 +55,14 @@ func (s *AdminServer) GetClusterVolumes(page int, pageSize int, sortBy string, s
volumes = append(volumes, volume)
totalSize += int64(volInfo.Size)
}
// Process EC shards in the same loop
for _, ecShardInfo := range diskInfo.EcShardInfos {
// Add all shard sizes for this EC volume
for _, shardSize := range ecShardInfo.ShardSizes {
totalSize += shardSize
}
}
}
}
}
@@ -66,6 +80,8 @@ func (s *AdminServer) GetClusterVolumes(page int, pageSize int, sortBy string, s
if collection != "" {
var filteredVolumes []VolumeWithTopology
var filteredTotalSize int64
var filteredEcTotalSize int64
for _, volume := range volumes {
// Handle "default" collection filtering for empty collections
volumeCollection := volume.Collection
@@ -78,8 +94,36 @@ func (s *AdminServer) GetClusterVolumes(page int, pageSize int, sortBy string, s
filteredTotalSize += int64(volume.Size)
}
}
// Filter EC shard sizes by collection using already processed data
// This reuses the topology traversal done above (lines 43-71) to avoid a second pass
if cachedTopologyInfo != nil {
for _, dc := range cachedTopologyInfo.DataCenterInfos {
for _, rack := range dc.RackInfos {
for _, node := range rack.DataNodeInfos {
for _, diskInfo := range node.DiskInfos {
for _, ecShardInfo := range diskInfo.EcShardInfos {
// Handle "default" collection filtering for empty collections
ecCollection := ecShardInfo.Collection
if ecCollection == "" {
ecCollection = "default"
}
if ecCollection == collection {
// Add all shard sizes for this EC volume
for _, shardSize := range ecShardInfo.ShardSizes {
filteredEcTotalSize += shardSize
}
}
}
}
}
}
}
}
volumes = filteredVolumes
totalSize = filteredTotalSize
totalSize = filteredTotalSize + filteredEcTotalSize
}
// Calculate unique data center, rack, disk type, collection, and version counts from filtered volumes
@@ -370,23 +414,151 @@ func (s *AdminServer) VacuumVolume(volumeID int, server string) error {
})
}
// GetClusterVolumeServers retrieves cluster volume servers data
// GetClusterVolumeServers retrieves cluster volume servers data including EC shard information
func (s *AdminServer) GetClusterVolumeServers() (*ClusterVolumeServersData, error) {
topology, err := s.GetClusterTopology()
var volumeServerMap map[string]*VolumeServer
// Make only ONE VolumeList call and use it for both topology building AND EC shard processing
err := s.WithMasterClient(func(client master_pb.SeaweedClient) error {
resp, err := client.VolumeList(context.Background(), &master_pb.VolumeListRequest{})
if err != nil {
return err
}
// Get volume size limit from response, default to 30GB if not set
volumeSizeLimitMB := resp.VolumeSizeLimitMb
if volumeSizeLimitMB == 0 {
volumeSizeLimitMB = 30000 // default to 30000MB (30GB)
}
// Build basic topology from the VolumeList response (replaces GetClusterTopology call)
volumeServerMap = make(map[string]*VolumeServer)
if resp.TopologyInfo != nil {
// Process topology to build basic volume server info (similar to cluster_topology.go logic)
for _, dc := range resp.TopologyInfo.DataCenterInfos {
for _, rack := range dc.RackInfos {
for _, node := range rack.DataNodeInfos {
// Initialize volume server if not exists
if volumeServerMap[node.Id] == nil {
volumeServerMap[node.Id] = &VolumeServer{
Address: node.Id,
DataCenter: dc.Id,
Rack: rack.Id,
Volumes: 0,
DiskUsage: 0,
DiskCapacity: 0,
EcVolumes: 0,
EcShards: 0,
EcShardDetails: []VolumeServerEcInfo{},
}
}
vs := volumeServerMap[node.Id]
// Process EC shard information for this server at volume server level (not per-disk)
ecVolumeMap := make(map[uint32]*VolumeServerEcInfo)
// Temporary map to accumulate shard info across disks
ecShardAccumulator := make(map[uint32][]*master_pb.VolumeEcShardInformationMessage)
// Process disk information
for _, diskInfo := range node.DiskInfos {
vs.DiskCapacity += int64(diskInfo.MaxVolumeCount) * int64(volumeSizeLimitMB) * 1024 * 1024 // Use actual volume size limit
// Count regular volumes and calculate disk usage
for _, volInfo := range diskInfo.VolumeInfos {
vs.Volumes++
vs.DiskUsage += int64(volInfo.Size)
}
// Accumulate EC shard information across all disks for this volume server
for _, ecShardInfo := range diskInfo.EcShardInfos {
volumeId := ecShardInfo.Id
ecShardAccumulator[volumeId] = append(ecShardAccumulator[volumeId], ecShardInfo)
}
}
// Process accumulated EC shard information per volume
for volumeId, ecShardInfos := range ecShardAccumulator {
if len(ecShardInfos) == 0 {
continue
}
// Initialize EC volume info
ecInfo := &VolumeServerEcInfo{
VolumeID: volumeId,
Collection: ecShardInfos[0].Collection,
ShardCount: 0,
EcIndexBits: 0,
ShardNumbers: []int{},
ShardSizes: make(map[int]int64),
TotalSize: 0,
}
// Merge EcIndexBits from all disks and collect shard sizes
allShardSizes := make(map[erasure_coding.ShardId]int64)
for _, ecShardInfo := range ecShardInfos {
ecInfo.EcIndexBits |= ecShardInfo.EcIndexBits
// Collect shard sizes from this disk
shardBits := erasure_coding.ShardBits(ecShardInfo.EcIndexBits)
shardBits.EachSetIndex(func(shardId erasure_coding.ShardId) {
if size, found := erasure_coding.GetShardSize(ecShardInfo, shardId); found {
allShardSizes[shardId] = size
}
})
}
// Process final merged shard information
finalShardBits := erasure_coding.ShardBits(ecInfo.EcIndexBits)
finalShardBits.EachSetIndex(func(shardId erasure_coding.ShardId) {
ecInfo.ShardCount++
ecInfo.ShardNumbers = append(ecInfo.ShardNumbers, int(shardId))
vs.EcShards++
// Add shard size if available
if shardSize, exists := allShardSizes[shardId]; exists {
ecInfo.ShardSizes[int(shardId)] = shardSize
ecInfo.TotalSize += shardSize
vs.DiskUsage += shardSize // Add EC shard size to total disk usage
}
})
ecVolumeMap[volumeId] = ecInfo
}
// Convert EC volume map to slice and update volume server (after processing all disks)
for _, ecInfo := range ecVolumeMap {
vs.EcShardDetails = append(vs.EcShardDetails, *ecInfo)
vs.EcVolumes++
}
}
}
}
}
return nil
})
if err != nil {
return nil, err
}
// Convert map back to slice
var volumeServers []VolumeServer
for _, vs := range volumeServerMap {
volumeServers = append(volumeServers, *vs)
}
var totalCapacity int64
var totalVolumes int
for _, vs := range topology.VolumeServers {
for _, vs := range volumeServers {
totalCapacity += vs.DiskCapacity
totalVolumes += vs.Volumes
}
return &ClusterVolumeServersData{
VolumeServers: topology.VolumeServers,
TotalVolumeServers: len(topology.VolumeServers),
VolumeServers: volumeServers,
TotalVolumeServers: len(volumeServers),
TotalVolumes: totalVolumes,
TotalCapacity: totalCapacity,
LastUpdated: time.Now(),