Respect -minFreeSpace during ec.decode (#8467)
* shell: add ec.decode ignoreMinFreeSpace flag Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * shell: respect minFreeSpace in ec.decode Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * shell: rename ec.decode minFreeSpace flag Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * shell: error when ec.decode has no shards Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * shell: select ec.decode target with zero shards Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * shell: adjust free counts across ec.decode Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * unused * Update weed/shell/command_ec_decode.go Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
This commit is contained in:
@@ -35,7 +35,7 @@ func (c *commandEcDecode) Name() string {
|
||||
func (c *commandEcDecode) Help() string {
|
||||
return `decode a erasure coded volume into a normal volume
|
||||
|
||||
ec.decode [-collection=""] [-volumeId=<volume_id>] [-diskType=<disk_type>]
|
||||
ec.decode [-collection=""] [-volumeId=<volume_id>] [-diskType=<disk_type>] [-checkMinFreeSpace]
|
||||
|
||||
The -collection parameter supports regular expressions for pattern matching:
|
||||
- Use exact match: ec.decode -collection="^mybucket$"
|
||||
@@ -44,6 +44,7 @@ func (c *commandEcDecode) Help() string {
|
||||
|
||||
Options:
|
||||
-diskType: source disk type where EC shards are stored (hdd, ssd, or empty for default hdd)
|
||||
-checkMinFreeSpace: check min free space when selecting the decode target (default true)
|
||||
|
||||
Examples:
|
||||
# Decode EC shards from HDD (default)
|
||||
@@ -64,6 +65,7 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
|
||||
volumeId := decodeCommand.Int("volumeId", 0, "the volume id")
|
||||
collection := decodeCommand.String("collection", "", "the collection name")
|
||||
diskTypeStr := decodeCommand.String("diskType", "", "source disk type where EC shards are stored (hdd, ssd, or empty for default hdd)")
|
||||
checkMinFreeSpace := decodeCommand.Bool("checkMinFreeSpace", true, "check min free space when selecting the decode target")
|
||||
if err = decodeCommand.Parse(args); err != nil {
|
||||
return nil
|
||||
}
|
||||
@@ -80,10 +82,14 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var diskUsageState *decodeDiskUsageState
|
||||
if *checkMinFreeSpace {
|
||||
diskUsageState = newDecodeDiskUsageState(topologyInfo, diskType)
|
||||
}
|
||||
|
||||
// volumeId is provided
|
||||
if vid != 0 {
|
||||
return doEcDecode(commandEnv, topologyInfo, *collection, vid, diskType)
|
||||
return doEcDecode(commandEnv, topologyInfo, *collection, vid, diskType, *checkMinFreeSpace, diskUsageState)
|
||||
}
|
||||
|
||||
// apply to all volumes in the collection
|
||||
@@ -93,7 +99,7 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
|
||||
}
|
||||
fmt.Printf("ec decode volumes: %v\n", volumeIds)
|
||||
for _, vid := range volumeIds {
|
||||
if err = doEcDecode(commandEnv, topologyInfo, *collection, vid, diskType); err != nil {
|
||||
if err = doEcDecode(commandEnv, topologyInfo, *collection, vid, diskType, *checkMinFreeSpace, diskUsageState); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -101,7 +107,7 @@ func (c *commandEcDecode) Do(args []string, commandEnv *CommandEnv, writer io.Wr
|
||||
return nil
|
||||
}
|
||||
|
||||
func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId, diskType types.DiskType) (err error) {
|
||||
func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collection string, vid needle.VolumeId, diskType types.DiskType, checkMinFreeSpace bool, diskUsageState *decodeDiskUsageState) (err error) {
|
||||
|
||||
if !commandEnv.isLocked() {
|
||||
return fmt.Errorf("lock is lost")
|
||||
@@ -112,8 +118,36 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec
|
||||
|
||||
fmt.Printf("ec volume %d shard locations: %+v\n", vid, nodeToEcShardsInfo)
|
||||
|
||||
if len(nodeToEcShardsInfo) == 0 {
|
||||
return fmt.Errorf("no EC shards found for volume %d (diskType %s)", vid, diskType.ReadableString())
|
||||
}
|
||||
|
||||
var originalShardCounts map[pb.ServerAddress]int
|
||||
if diskUsageState != nil {
|
||||
originalShardCounts = make(map[pb.ServerAddress]int, len(nodeToEcShardsInfo))
|
||||
for location, si := range nodeToEcShardsInfo {
|
||||
originalShardCounts[location] = si.Count()
|
||||
}
|
||||
}
|
||||
|
||||
var eligibleTargets map[pb.ServerAddress]struct{}
|
||||
if checkMinFreeSpace {
|
||||
if diskUsageState == nil {
|
||||
return fmt.Errorf("min free space checking requires disk usage state")
|
||||
}
|
||||
eligibleTargets = make(map[pb.ServerAddress]struct{})
|
||||
for location := range nodeToEcShardsInfo {
|
||||
if freeCount, found := diskUsageState.freeVolumeCount(location); found && freeCount > 0 {
|
||||
eligibleTargets[location] = struct{}{}
|
||||
}
|
||||
}
|
||||
if len(eligibleTargets) == 0 {
|
||||
return fmt.Errorf("no eligible target datanodes with free volume slots for volume %d (diskType %s); use -checkMinFreeSpace=false to override", vid, diskType.ReadableString())
|
||||
}
|
||||
}
|
||||
|
||||
// collect ec shards to the server with most space
|
||||
targetNodeLocation, err := collectEcShards(commandEnv, nodeToEcShardsInfo, collection, vid)
|
||||
targetNodeLocation, err := collectEcShards(commandEnv, nodeToEcShardsInfo, collection, vid, eligibleTargets)
|
||||
if err != nil {
|
||||
return fmt.Errorf("collectEcShards for volume %d: %v", vid, err)
|
||||
}
|
||||
@@ -124,7 +158,13 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec
|
||||
// Special case: if the EC index has no live entries, decoding is a no-op.
|
||||
// Just purge EC shards and return success without generating/mounting an empty volume.
|
||||
if isEcDecodeEmptyVolumeErr(err) {
|
||||
return unmountAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, nodeToEcShardsInfo, vid)
|
||||
if err := unmountAndDeleteEcShards(commandEnv.option.GrpcDialOption, collection, nodeToEcShardsInfo, vid); err != nil {
|
||||
return err
|
||||
}
|
||||
if diskUsageState != nil {
|
||||
diskUsageState.applyDecode(targetNodeLocation, originalShardCounts, false)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("generate normal volume %d on %s: %v", vid, targetNodeLocation, err)
|
||||
}
|
||||
@@ -134,6 +174,9 @@ func doEcDecode(commandEnv *CommandEnv, topoInfo *master_pb.TopologyInfo, collec
|
||||
if err != nil {
|
||||
return fmt.Errorf("delete ec shards for volume %d: %v", vid, err)
|
||||
}
|
||||
if diskUsageState != nil {
|
||||
diskUsageState.applyDecode(targetNodeLocation, originalShardCounts, true)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -207,11 +250,16 @@ func generateNormalVolume(grpcDialOption grpc.DialOption, vid needle.VolumeId, c
|
||||
|
||||
}
|
||||
|
||||
func collectEcShards(commandEnv *CommandEnv, nodeToShardsInfo map[pb.ServerAddress]*erasure_coding.ShardsInfo, collection string, vid needle.VolumeId) (targetNodeLocation pb.ServerAddress, err error) {
|
||||
func collectEcShards(commandEnv *CommandEnv, nodeToShardsInfo map[pb.ServerAddress]*erasure_coding.ShardsInfo, collection string, vid needle.VolumeId, eligibleTargets map[pb.ServerAddress]struct{}) (targetNodeLocation pb.ServerAddress, err error) {
|
||||
|
||||
maxShardCount := 0
|
||||
maxShardCount := -1
|
||||
existingShardsInfo := erasure_coding.NewShardsInfo()
|
||||
for loc, si := range nodeToShardsInfo {
|
||||
if eligibleTargets != nil {
|
||||
if _, ok := eligibleTargets[loc]; !ok {
|
||||
continue
|
||||
}
|
||||
}
|
||||
toBeCopiedShardCount := si.MinusParityShards().Count()
|
||||
if toBeCopiedShardCount > maxShardCount {
|
||||
maxShardCount = toBeCopiedShardCount
|
||||
@@ -219,6 +267,9 @@ func collectEcShards(commandEnv *CommandEnv, nodeToShardsInfo map[pb.ServerAddre
|
||||
existingShardsInfo = si
|
||||
}
|
||||
}
|
||||
if targetNodeLocation == "" {
|
||||
return "", fmt.Errorf("no eligible target datanodes available to decode volume %d", vid)
|
||||
}
|
||||
|
||||
fmt.Printf("collectEcShards: ec volume %d collect shards to %s from: %+v\n", vid, targetNodeLocation, nodeToShardsInfo)
|
||||
|
||||
@@ -326,3 +377,58 @@ func collectEcNodeShardsInfo(topoInfo *master_pb.TopologyInfo, vid needle.Volume
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
type decodeDiskUsageState struct {
|
||||
byNode map[pb.ServerAddress]*decodeDiskUsageCounts
|
||||
}
|
||||
|
||||
type decodeDiskUsageCounts struct {
|
||||
maxVolumeCount int64
|
||||
volumeCount int64
|
||||
remoteVolumeCount int64
|
||||
ecShardCount int64
|
||||
}
|
||||
|
||||
func newDecodeDiskUsageState(topoInfo *master_pb.TopologyInfo, diskType types.DiskType) *decodeDiskUsageState {
|
||||
state := &decodeDiskUsageState{byNode: make(map[pb.ServerAddress]*decodeDiskUsageCounts)}
|
||||
eachDataNode(topoInfo, func(dc DataCenterId, rack RackId, dn *master_pb.DataNodeInfo) {
|
||||
if diskInfo, found := dn.DiskInfos[string(diskType)]; found {
|
||||
state.byNode[pb.NewServerAddressFromDataNode(dn)] = &decodeDiskUsageCounts{
|
||||
maxVolumeCount: diskInfo.MaxVolumeCount,
|
||||
volumeCount: diskInfo.VolumeCount,
|
||||
remoteVolumeCount: diskInfo.RemoteVolumeCount,
|
||||
ecShardCount: int64(countShards(diskInfo.EcShardInfos)),
|
||||
}
|
||||
}
|
||||
})
|
||||
return state
|
||||
}
|
||||
|
||||
func (state *decodeDiskUsageState) freeVolumeCount(location pb.ServerAddress) (int64, bool) {
|
||||
if state == nil {
|
||||
return 0, false
|
||||
}
|
||||
usage, found := state.byNode[location]
|
||||
if !found {
|
||||
return 0, false
|
||||
}
|
||||
free := usage.maxVolumeCount - (usage.volumeCount - usage.remoteVolumeCount)
|
||||
free -= (usage.ecShardCount + int64(erasure_coding.DataShardsCount) - 1) / int64(erasure_coding.DataShardsCount)
|
||||
return free, true
|
||||
}
|
||||
|
||||
func (state *decodeDiskUsageState) applyDecode(targetNodeLocation pb.ServerAddress, shardCounts map[pb.ServerAddress]int, createdVolume bool) {
|
||||
if state == nil {
|
||||
return
|
||||
}
|
||||
for location, shardCount := range shardCounts {
|
||||
if usage, found := state.byNode[location]; found {
|
||||
usage.ecShardCount -= int64(shardCount)
|
||||
}
|
||||
}
|
||||
if createdVolume {
|
||||
if usage, found := state.byNode[targetNodeLocation]; found {
|
||||
usage.volumeCount++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user