* feat(balance): add replica placement validation for volume moves When the volume balance detection proposes moving a volume, validate that the move does not violate the volume's replication policy (e.g., ReplicaPlacement=010 requires replicas on different racks). If the preferred destination violates the policy, fall back to score-based planning; if that also violates, skip the volume entirely. - Add ReplicaLocation type and VolumeReplicaMap to ClusterInfo - Build replica map from all volumes before collection filtering - Port placement validation logic from command_volume_fix_replication.go - Thread replica map through collectVolumeMetrics call chain - Add IsGoodMove check in createBalanceTask before destination use * address PR review: extract validation closure, add defensive checks - Extract validateMove closure to eliminate duplicated ReplicaLocation construction and IsGoodMove calls - Add defensive check for empty replica map entries (len(replicas) == 0) - Add bounds check for int-to-byte cast on ExpectedReplicas (0-255) * address nitpick: rp test helper accepts *testing.T and fails on error Prevents silent failures from typos in replica placement codes. * address review: add composite replica placement tests (011, 110) Test multi-constraint placement policies where both rack and DC rules must be satisfied simultaneously. * address review: use struct keys instead of string concatenation Replace string-concatenated map keys with typed rackKey/nodeKey structs to eliminate allocations and avoid ambiguity if IDs contain spaces. * address review: simplify bounds check, log fallback error, guard source - Remove unreachable ExpectedReplicas < 0 branch (outer condition already guarantees > 0), fold bounds check into single condition - Log error from planBalanceDestination in replica validation fallback - Return false from IsGoodMove when sourceNodeID not found in existing replicas (inconsistent cluster state) * address review: use slices.Contains instead of hand-rolled helpers Replace isAmongDC and isAmongRack with slices.Contains from the standard library, reducing boilerplate.
206 lines
6.3 KiB
Go
206 lines
6.3 KiB
Go
package pluginworker
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"regexp"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/admin/topology"
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
|
|
workertypes "github.com/seaweedfs/seaweedfs/weed/worker/types"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
func collectVolumeMetricsFromMasters(
|
|
ctx context.Context,
|
|
masterAddresses []string,
|
|
collectionFilter string,
|
|
grpcDialOption grpc.DialOption,
|
|
) ([]*workertypes.VolumeHealthMetrics, *topology.ActiveTopology, map[uint32][]workertypes.ReplicaLocation, error) {
|
|
if grpcDialOption == nil {
|
|
return nil, nil, nil, fmt.Errorf("grpc dial option is not configured")
|
|
}
|
|
if len(masterAddresses) == 0 {
|
|
return nil, nil, nil, fmt.Errorf("no master addresses provided in cluster context")
|
|
}
|
|
|
|
for _, masterAddress := range masterAddresses {
|
|
response, err := fetchVolumeList(ctx, masterAddress, grpcDialOption)
|
|
if err != nil {
|
|
glog.Warningf("Plugin worker failed master volume list at %s: %v", masterAddress, err)
|
|
continue
|
|
}
|
|
|
|
metrics, activeTopology, replicaMap, buildErr := buildVolumeMetrics(response, collectionFilter)
|
|
if buildErr != nil {
|
|
// Configuration errors (e.g. invalid regex) will fail on every master,
|
|
// so return immediately instead of masking them with retries.
|
|
if isConfigError(buildErr) {
|
|
return nil, nil, nil, buildErr
|
|
}
|
|
glog.Warningf("Plugin worker failed to build metrics from master %s: %v", masterAddress, buildErr)
|
|
continue
|
|
}
|
|
return metrics, activeTopology, replicaMap, nil
|
|
}
|
|
|
|
return nil, nil, nil, fmt.Errorf("failed to load topology from all provided masters")
|
|
}
|
|
|
|
func fetchVolumeList(ctx context.Context, address string, grpcDialOption grpc.DialOption) (*master_pb.VolumeListResponse, error) {
|
|
var lastErr error
|
|
for _, candidate := range masterAddressCandidates(address) {
|
|
if ctx.Err() != nil {
|
|
return nil, ctx.Err()
|
|
}
|
|
|
|
dialCtx, cancelDial := context.WithTimeout(ctx, 5*time.Second)
|
|
conn, err := pb.GrpcDial(dialCtx, candidate, false, grpcDialOption)
|
|
cancelDial()
|
|
if err != nil {
|
|
lastErr = err
|
|
continue
|
|
}
|
|
|
|
client := master_pb.NewSeaweedClient(conn)
|
|
callCtx, cancelCall := context.WithTimeout(ctx, 10*time.Second)
|
|
response, callErr := client.VolumeList(callCtx, &master_pb.VolumeListRequest{})
|
|
cancelCall()
|
|
_ = conn.Close()
|
|
|
|
if callErr == nil {
|
|
return response, nil
|
|
}
|
|
lastErr = callErr
|
|
}
|
|
|
|
if lastErr == nil {
|
|
lastErr = fmt.Errorf("no valid master address candidate")
|
|
}
|
|
return nil, lastErr
|
|
}
|
|
|
|
func buildVolumeMetrics(
|
|
response *master_pb.VolumeListResponse,
|
|
collectionFilter string,
|
|
) ([]*workertypes.VolumeHealthMetrics, *topology.ActiveTopology, map[uint32][]workertypes.ReplicaLocation, error) {
|
|
if response == nil || response.TopologyInfo == nil {
|
|
return nil, nil, nil, fmt.Errorf("volume list response has no topology info")
|
|
}
|
|
|
|
activeTopology := topology.NewActiveTopology(10)
|
|
if err := activeTopology.UpdateTopology(response.TopologyInfo); err != nil {
|
|
return nil, nil, nil, err
|
|
}
|
|
|
|
var collectionRegex *regexp.Regexp
|
|
trimmedFilter := strings.TrimSpace(collectionFilter)
|
|
if trimmedFilter != "" && trimmedFilter != collectionFilterAll && trimmedFilter != collectionFilterEach && trimmedFilter != "*" {
|
|
var err error
|
|
collectionRegex, err = regexp.Compile(trimmedFilter)
|
|
if err != nil {
|
|
return nil, nil, nil, &configError{err: fmt.Errorf("invalid collection_filter regex %q: %w", trimmedFilter, err)}
|
|
}
|
|
}
|
|
|
|
volumeSizeLimitBytes := uint64(response.VolumeSizeLimitMb) * 1024 * 1024
|
|
now := time.Now()
|
|
metrics := make([]*workertypes.VolumeHealthMetrics, 0, 256)
|
|
replicaMap := make(map[uint32][]workertypes.ReplicaLocation)
|
|
|
|
for _, dc := range response.TopologyInfo.DataCenterInfos {
|
|
for _, rack := range dc.RackInfos {
|
|
for _, node := range rack.DataNodeInfos {
|
|
for diskType, diskInfo := range node.DiskInfos {
|
|
for _, volume := range diskInfo.VolumeInfos {
|
|
// Build replica map from ALL volumes BEFORE collection filtering,
|
|
// since replicas may span filtered/unfiltered nodes.
|
|
replicaMap[volume.Id] = append(replicaMap[volume.Id], workertypes.ReplicaLocation{
|
|
DataCenter: dc.Id,
|
|
Rack: rack.Id,
|
|
NodeID: node.Id,
|
|
})
|
|
|
|
if collectionRegex != nil && !collectionRegex.MatchString(volume.Collection) {
|
|
continue
|
|
}
|
|
|
|
metric := &workertypes.VolumeHealthMetrics{
|
|
VolumeID: volume.Id,
|
|
Server: node.Id,
|
|
ServerAddress: string(pb.NewServerAddressFromDataNode(node)),
|
|
DiskType: diskType,
|
|
DiskId: volume.DiskId,
|
|
DataCenter: dc.Id,
|
|
Rack: rack.Id,
|
|
Collection: volume.Collection,
|
|
Size: volume.Size,
|
|
DeletedBytes: volume.DeletedByteCount,
|
|
LastModified: time.Unix(volume.ModifiedAtSecond, 0),
|
|
ReplicaCount: 1,
|
|
ExpectedReplicas: int(volume.ReplicaPlacement),
|
|
IsReadOnly: volume.ReadOnly,
|
|
}
|
|
if metric.Size > 0 {
|
|
metric.GarbageRatio = float64(metric.DeletedBytes) / float64(metric.Size)
|
|
}
|
|
if volumeSizeLimitBytes > 0 {
|
|
metric.FullnessRatio = float64(metric.Size) / float64(volumeSizeLimitBytes)
|
|
}
|
|
metric.Age = now.Sub(metric.LastModified)
|
|
metrics = append(metrics, metric)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
replicaCounts := make(map[uint32]int)
|
|
for _, metric := range metrics {
|
|
replicaCounts[metric.VolumeID]++
|
|
}
|
|
for _, metric := range metrics {
|
|
metric.ReplicaCount = replicaCounts[metric.VolumeID]
|
|
}
|
|
|
|
return metrics, activeTopology, replicaMap, nil
|
|
}
|
|
|
|
// configError wraps configuration errors that should not be retried across masters.
|
|
type configError struct {
|
|
err error
|
|
}
|
|
|
|
func (e *configError) Error() string { return e.err.Error() }
|
|
func (e *configError) Unwrap() error { return e.err }
|
|
|
|
func isConfigError(err error) bool {
|
|
var ce *configError
|
|
return errors.As(err, &ce)
|
|
}
|
|
|
|
func masterAddressCandidates(address string) []string {
|
|
trimmed := strings.TrimSpace(address)
|
|
if trimmed == "" {
|
|
return nil
|
|
}
|
|
candidateSet := map[string]struct{}{
|
|
trimmed: {},
|
|
}
|
|
converted := pb.ServerToGrpcAddress(trimmed)
|
|
candidateSet[converted] = struct{}{}
|
|
|
|
candidates := make([]string, 0, len(candidateSet))
|
|
for candidate := range candidateSet {
|
|
candidates = append(candidates, candidate)
|
|
}
|
|
sort.Strings(candidates)
|
|
return candidates
|
|
}
|