Parallelize operations for the volume.scrub and ec.scrub commands (#8247)

Parallelize operations for the `volume.scrub` and `ec.scrub` commands.
This commit is contained in:
Lisandro Pin
2026-02-09 18:07:06 +01:00
committed by GitHub
parent c9428c2c0f
commit 63b846b73b
2 changed files with 84 additions and 58 deletions

View File

@@ -7,6 +7,7 @@ import (
"io"
"strconv"
"strings"
"sync"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
@@ -50,7 +51,7 @@ func (c *commandEcVolumeScrub) Do(args []string, commandEnv *CommandEnv, writer
volumeIDsStr := volScrubCommand.String("volumeId", "", "comma-separated EC volume IDs to process (optional)")
// TODO: switch default mode to LOCAL, once implemented.
mode := volScrubCommand.String("mode", "index", "scrubbing mode (index/local/full)")
// TODO: add per-node parallelization
maxParallelization := volScrubCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible")
if err = volScrubCommand.Parse(args); err != nil {
return err
@@ -102,45 +103,57 @@ func (c *commandEcVolumeScrub) Do(args []string, commandEnv *CommandEnv, writer
fmt.Fprintf(writer, "using %s mode\n", c.mode.String())
c.env = commandEnv
return c.scrubEcVolumes(writer)
return c.scrubEcVolumes(writer, *maxParallelization)
}
func (c *commandEcVolumeScrub) scrubEcVolumes(writer io.Writer) error {
func (c *commandEcVolumeScrub) scrubEcVolumes(writer io.Writer, maxParallelization int) error {
var brokenVolumesStr, brokenShardsStr []string
var details []string
var totalVolumes, brokenVolumes, brokenShards, totalFiles uint64
var mu sync.Mutex
for i, addr := range c.volumeServerAddrs {
fmt.Fprintf(writer, "Scrubbing %s (%d/%d)...\n", addr.String(), i+1, len(c.volumeServerAddrs))
ewg := NewErrorWaitGroup(maxParallelization)
count := 0
for _, addr := range c.volumeServerAddrs {
ewg.Add(func() error {
mu.Lock()
count++
fmt.Fprintf(writer, "Scrubbing %s (%d/%d)...\n", addr.String(), count, len(c.volumeServerAddrs))
mu.Unlock()
err := operation.WithVolumeServerClient(false, addr, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
res, err := volumeServerClient.ScrubEcVolume(context.Background(), &volume_server_pb.ScrubEcVolumeRequest{
Mode: c.mode,
VolumeIds: c.volumeIDs,
err := operation.WithVolumeServerClient(false, addr, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
res, err := volumeServerClient.ScrubEcVolume(context.Background(), &volume_server_pb.ScrubEcVolumeRequest{
Mode: c.mode,
VolumeIds: c.volumeIDs,
})
if err != nil {
return err
}
mu.Lock()
defer mu.Unlock()
totalVolumes += res.GetTotalVolumes()
totalFiles += res.GetTotalFiles()
brokenVolumes += uint64(len(res.GetBrokenVolumeIds()))
brokenShards += uint64(len(res.GetBrokenShardInfos()))
for _, d := range res.GetDetails() {
details = append(details, fmt.Sprintf("[%s] %s", addr, d))
}
for _, vid := range res.GetBrokenVolumeIds() {
brokenVolumesStr = append(brokenVolumesStr, fmt.Sprintf("%s:%v", addr, vid))
}
for _, si := range res.GetBrokenShardInfos() {
brokenShardsStr = append(brokenShardsStr, fmt.Sprintf("%s:%v:%v", addr, si.VolumeId, si.ShardId))
}
return nil
})
if err != nil {
return err
}
totalVolumes += res.GetTotalVolumes()
totalFiles += res.GetTotalFiles()
brokenVolumes += uint64(len(res.GetBrokenVolumeIds()))
brokenShards += uint64(len(res.GetBrokenShardInfos()))
for _, d := range res.GetDetails() {
details = append(details, fmt.Sprintf("[%s] %s", addr, d))
}
for _, vid := range res.GetBrokenVolumeIds() {
brokenVolumesStr = append(brokenVolumesStr, fmt.Sprintf("%s:%v", addr, vid))
}
for _, si := range res.GetBrokenShardInfos() {
brokenShardsStr = append(brokenShardsStr, fmt.Sprintf("%s:%v:%v", addr, si.VolumeId, si.ShardId))
}
return nil
})
if err != nil {
return err
}
})
}
if err := ewg.Wait(); err != nil {
return err
}
fmt.Fprintf(writer, "Scrubbed %d EC files and %d volumes on %d nodes\n", totalFiles, totalVolumes, len(c.volumeServerAddrs))

View File

@@ -7,6 +7,7 @@ import (
"io"
"strconv"
"strings"
"sync"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
@@ -51,7 +52,7 @@ func (c *commandVolumeScrub) Do(args []string, commandEnv *CommandEnv, writer io
volumeIDsStr := volScrubCommand.String("volumeId", "", "comma-separated volume IDs to process (optional)")
// TODO: switch default mode to LOCAL, once implemented.
mode := volScrubCommand.String("mode", "index", "scrubbing mode (index/local/full)")
// TODO: add per-node parallelization
maxParallelization := volScrubCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible")
if err = volScrubCommand.Parse(args); err != nil {
return err
@@ -103,41 +104,53 @@ func (c *commandVolumeScrub) Do(args []string, commandEnv *CommandEnv, writer io
fmt.Fprintf(writer, "using %s mode\n", c.mode.String())
c.env = commandEnv
return c.scrubVolumes(writer)
return c.scrubVolumes(writer, *maxParallelization)
}
func (c *commandVolumeScrub) scrubVolumes(writer io.Writer) error {
func (c *commandVolumeScrub) scrubVolumes(writer io.Writer, maxParallelization int) error {
var brokenVolumesStr []string
var details []string
var totalVolumes, brokenVolumes, totalFiles uint64
var mu sync.Mutex
for i, addr := range c.volumeServerAddrs {
fmt.Fprintf(writer, "Scrubbing %s (%d/%d)...\n", addr.String(), i+1, len(c.volumeServerAddrs))
ewg := NewErrorWaitGroup(maxParallelization)
count := 0
for _, addr := range c.volumeServerAddrs {
ewg.Add(func() error {
mu.Lock()
count++
fmt.Fprintf(writer, "Scrubbing %s (%d/%d)...\n", addr.String(), count, len(c.volumeServerAddrs))
mu.Unlock()
err := operation.WithVolumeServerClient(false, addr, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
res, err := volumeServerClient.ScrubVolume(context.Background(), &volume_server_pb.ScrubVolumeRequest{
Mode: c.mode,
VolumeIds: c.volumeIDs,
err := operation.WithVolumeServerClient(false, addr, c.env.option.GrpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
res, err := volumeServerClient.ScrubVolume(context.Background(), &volume_server_pb.ScrubVolumeRequest{
Mode: c.mode,
VolumeIds: c.volumeIDs,
})
if err != nil {
return err
}
mu.Lock()
defer mu.Unlock()
totalVolumes += res.GetTotalVolumes()
totalFiles += res.GetTotalFiles()
brokenVolumes += uint64(len(res.GetBrokenVolumeIds()))
for _, d := range res.GetDetails() {
details = append(details, fmt.Sprintf("[%s] %s", addr, d))
}
for _, vid := range res.GetBrokenVolumeIds() {
brokenVolumesStr = append(brokenVolumesStr, fmt.Sprintf("%s:%v", addr, vid))
}
return nil
})
if err != nil {
return err
}
totalVolumes += res.GetTotalVolumes()
totalFiles += res.GetTotalFiles()
brokenVolumes += uint64(len(res.GetBrokenVolumeIds()))
for _, d := range res.GetDetails() {
details = append(details, fmt.Sprintf("[%s] %s", addr, d))
}
for _, vid := range res.GetBrokenVolumeIds() {
brokenVolumesStr = append(brokenVolumesStr, fmt.Sprintf("%s:%v", addr, vid))
}
return nil
})
if err != nil {
return err
}
})
}
if err := ewg.Wait(); err != nil {
return err
}
fmt.Fprintf(writer, "Scrubbed %d files and %d volumes on %d nodes\n", totalFiles, totalVolumes, len(c.volumeServerAddrs))