Master: volume assignment concurrency (#7159)

* volume assginment concurrency

* accurate tests

* ensure uniqness

* reserve atomically

* address comments

* atomic

* ReserveOneVolumeForReservation

* duplicated

* Update weed/topology/node.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* Update weed/topology/node.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* atomic counter

* dedup

* select the appropriate functions based on the useReservations flag

---------

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
This commit is contained in:
Chris Lu
2025-08-23 21:02:30 -07:00
committed by GitHub
parent 91b88262d7
commit 7acebf11ea
10 changed files with 1086 additions and 32 deletions

View File

@@ -2,6 +2,7 @@ package topology
import (
"errors"
"fmt"
"math/rand/v2"
"strings"
"sync"
@@ -16,15 +17,124 @@ import (
)
type NodeId string
// CapacityReservation represents a temporary reservation of capacity
type CapacityReservation struct {
reservationId string
diskType types.DiskType
count int64
createdAt time.Time
}
// CapacityReservations manages capacity reservations for a node
type CapacityReservations struct {
sync.RWMutex
reservations map[string]*CapacityReservation
reservedCounts map[types.DiskType]int64
}
func newCapacityReservations() *CapacityReservations {
return &CapacityReservations{
reservations: make(map[string]*CapacityReservation),
reservedCounts: make(map[types.DiskType]int64),
}
}
func (cr *CapacityReservations) addReservation(diskType types.DiskType, count int64) string {
cr.Lock()
defer cr.Unlock()
return cr.doAddReservation(diskType, count)
}
func (cr *CapacityReservations) removeReservation(reservationId string) bool {
cr.Lock()
defer cr.Unlock()
if reservation, exists := cr.reservations[reservationId]; exists {
delete(cr.reservations, reservationId)
cr.decrementCount(reservation.diskType, reservation.count)
return true
}
return false
}
func (cr *CapacityReservations) getReservedCount(diskType types.DiskType) int64 {
cr.RLock()
defer cr.RUnlock()
return cr.reservedCounts[diskType]
}
// decrementCount is a helper to decrement reserved count and clean up zero entries
func (cr *CapacityReservations) decrementCount(diskType types.DiskType, count int64) {
cr.reservedCounts[diskType] -= count
// Clean up zero counts to prevent map growth
if cr.reservedCounts[diskType] <= 0 {
delete(cr.reservedCounts, diskType)
}
}
// doAddReservation is a helper to add a reservation, assuming the lock is already held
func (cr *CapacityReservations) doAddReservation(diskType types.DiskType, count int64) string {
now := time.Now()
reservationId := fmt.Sprintf("%s-%d-%d-%d", diskType, count, now.UnixNano(), rand.Int64())
cr.reservations[reservationId] = &CapacityReservation{
reservationId: reservationId,
diskType: diskType,
count: count,
createdAt: now,
}
cr.reservedCounts[diskType] += count
return reservationId
}
// tryReserveAtomic atomically checks available space and reserves if possible
func (cr *CapacityReservations) tryReserveAtomic(diskType types.DiskType, count int64, availableSpaceFunc func() int64) (reservationId string, success bool) {
cr.Lock()
defer cr.Unlock()
// Check available space under lock
currentReserved := cr.reservedCounts[diskType]
availableSpace := availableSpaceFunc() - currentReserved
if availableSpace >= count {
// Create and add reservation atomically
return cr.doAddReservation(diskType, count), true
}
return "", false
}
func (cr *CapacityReservations) cleanExpiredReservations(expirationDuration time.Duration) {
cr.Lock()
defer cr.Unlock()
now := time.Now()
for id, reservation := range cr.reservations {
if now.Sub(reservation.createdAt) > expirationDuration {
delete(cr.reservations, id)
cr.decrementCount(reservation.diskType, reservation.count)
glog.V(1).Infof("Cleaned up expired capacity reservation: %s", id)
}
}
}
type Node interface {
Id() NodeId
String() string
AvailableSpaceFor(option *VolumeGrowOption) int64
ReserveOneVolume(r int64, option *VolumeGrowOption) (*DataNode, error)
ReserveOneVolumeForReservation(r int64, option *VolumeGrowOption) (*DataNode, error)
UpAdjustDiskUsageDelta(diskType types.DiskType, diskUsage *DiskUsageCounts)
UpAdjustMaxVolumeId(vid needle.VolumeId)
GetDiskUsages() *DiskUsages
// Capacity reservation methods for avoiding race conditions
TryReserveCapacity(diskType types.DiskType, count int64) (reservationId string, success bool)
ReleaseReservedCapacity(reservationId string)
AvailableSpaceForReservation(option *VolumeGrowOption) int64
GetMaxVolumeId() needle.VolumeId
SetParent(Node)
LinkChildNode(node Node)
@@ -52,6 +162,9 @@ type NodeImpl struct {
//for rack, data center, topology
nodeType string
value interface{}
// capacity reservations to prevent race conditions during volume creation
capacityReservations *CapacityReservations
}
func (n *NodeImpl) GetDiskUsages() *DiskUsages {
@@ -164,6 +277,42 @@ func (n *NodeImpl) AvailableSpaceFor(option *VolumeGrowOption) int64 {
}
return freeVolumeSlotCount
}
// AvailableSpaceForReservation returns available space considering existing reservations
func (n *NodeImpl) AvailableSpaceForReservation(option *VolumeGrowOption) int64 {
baseAvailable := n.AvailableSpaceFor(option)
reservedCount := n.capacityReservations.getReservedCount(option.DiskType)
return baseAvailable - reservedCount
}
// TryReserveCapacity attempts to atomically reserve capacity for volume creation
func (n *NodeImpl) TryReserveCapacity(diskType types.DiskType, count int64) (reservationId string, success bool) {
const reservationTimeout = 5 * time.Minute // TODO: make this configurable
// Clean up any expired reservations first
n.capacityReservations.cleanExpiredReservations(reservationTimeout)
// Atomically check and reserve space
option := &VolumeGrowOption{DiskType: diskType}
reservationId, success = n.capacityReservations.tryReserveAtomic(diskType, count, func() int64 {
return n.AvailableSpaceFor(option)
})
if success {
glog.V(1).Infof("Reserved %d capacity for diskType %s on node %s: %s", count, diskType, n.Id(), reservationId)
}
return reservationId, success
}
// ReleaseReservedCapacity releases a previously reserved capacity
func (n *NodeImpl) ReleaseReservedCapacity(reservationId string) {
if n.capacityReservations.removeReservation(reservationId) {
glog.V(1).Infof("Released capacity reservation on node %s: %s", n.Id(), reservationId)
} else {
glog.V(1).Infof("Attempted to release non-existent reservation on node %s: %s", n.Id(), reservationId)
}
}
func (n *NodeImpl) SetParent(node Node) {
n.parent = node
}
@@ -186,10 +335,24 @@ func (n *NodeImpl) GetValue() interface{} {
}
func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assignedNode *DataNode, err error) {
return n.reserveOneVolumeInternal(r, option, false)
}
// ReserveOneVolumeForReservation selects a node using reservation-aware capacity checks
func (n *NodeImpl) ReserveOneVolumeForReservation(r int64, option *VolumeGrowOption) (assignedNode *DataNode, err error) {
return n.reserveOneVolumeInternal(r, option, true)
}
func (n *NodeImpl) reserveOneVolumeInternal(r int64, option *VolumeGrowOption, useReservations bool) (assignedNode *DataNode, err error) {
n.RLock()
defer n.RUnlock()
for _, node := range n.children {
freeSpace := node.AvailableSpaceFor(option)
var freeSpace int64
if useReservations {
freeSpace = node.AvailableSpaceForReservation(option)
} else {
freeSpace = node.AvailableSpaceFor(option)
}
// fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
if freeSpace <= 0 {
continue
@@ -197,7 +360,13 @@ func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assigned
if r >= freeSpace {
r -= freeSpace
} else {
if node.IsDataNode() && node.AvailableSpaceFor(option) > 0 {
var hasSpace bool
if useReservations {
hasSpace = node.IsDataNode() && node.AvailableSpaceForReservation(option) > 0
} else {
hasSpace = node.IsDataNode() && node.AvailableSpaceFor(option) > 0
}
if hasSpace {
// fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
dn := node.(*DataNode)
if dn.IsTerminating {
@@ -205,7 +374,11 @@ func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assigned
}
return dn, nil
}
assignedNode, err = node.ReserveOneVolume(r, option)
if useReservations {
assignedNode, err = node.ReserveOneVolumeForReservation(r, option)
} else {
assignedNode, err = node.ReserveOneVolume(r, option)
}
if err == nil {
return
}