Implement local scrubbing for EC volumes. (#8283)
This commit is contained in:
@@ -99,9 +99,9 @@ func (vs *VolumeServer) ScrubEcVolume(ctx context.Context, req *volume_server_pb
|
|||||||
switch m := req.GetMode(); m {
|
switch m := req.GetMode(); m {
|
||||||
case volume_server_pb.VolumeScrubMode_INDEX:
|
case volume_server_pb.VolumeScrubMode_INDEX:
|
||||||
// index scrubs do not verify individual EC shards
|
// index scrubs do not verify individual EC shards
|
||||||
files, serrs = v.CheckIndex()
|
files, serrs = v.ScrubIndex()
|
||||||
case volume_server_pb.VolumeScrubMode_LOCAL:
|
case volume_server_pb.VolumeScrubMode_LOCAL:
|
||||||
files, shardInfos, serrs = scrubEcVolumeLocal(ctx, v)
|
files, shardInfos, serrs = v.ScrubLocal()
|
||||||
case volume_server_pb.VolumeScrubMode_FULL:
|
case volume_server_pb.VolumeScrubMode_FULL:
|
||||||
files, shardInfos, serrs = scrubEcVolumeFull(ctx, v)
|
files, shardInfos, serrs = scrubEcVolumeFull(ctx, v)
|
||||||
default:
|
default:
|
||||||
@@ -129,10 +129,6 @@ func (vs *VolumeServer) ScrubEcVolume(ctx context.Context, req *volume_server_pb
|
|||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func scrubEcVolumeLocal(ctx context.Context, v *erasure_coding.EcVolume) (int64, []*volume_server_pb.EcShardInfo, []error) {
|
|
||||||
return 0, nil, []error{fmt.Errorf("scrubEcVolumeLocal(): not implemented, see https://github.com/seaweedfs/seaweedfs/issues/8018")}
|
|
||||||
}
|
|
||||||
|
|
||||||
func scrubEcVolumeFull(ctx context.Context, v *erasure_coding.EcVolume) (int64, []*volume_server_pb.EcShardInfo, []error) {
|
func scrubEcVolumeFull(ctx context.Context, v *erasure_coding.EcVolume) (int64, []*volume_server_pb.EcShardInfo, []error) {
|
||||||
return 0, nil, []error{fmt.Errorf("scrubEcVolumeFull(): not implemented, see https://github.com/seaweedfs/seaweedfs/issues/8018")}
|
return 0, nil, []error{fmt.Errorf("scrubEcVolumeFull(): not implemented, see https://github.com/seaweedfs/seaweedfs/issues/8018")}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -49,8 +49,7 @@ func (c *commandEcVolumeScrub) Do(args []string, commandEnv *CommandEnv, writer
|
|||||||
volScrubCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
volScrubCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
|
||||||
nodesStr := volScrubCommand.String("node", "", "comma-separated list of volume server <host>:<port> (optional)")
|
nodesStr := volScrubCommand.String("node", "", "comma-separated list of volume server <host>:<port> (optional)")
|
||||||
volumeIDsStr := volScrubCommand.String("volumeId", "", "comma-separated EC volume IDs to process (optional)")
|
volumeIDsStr := volScrubCommand.String("volumeId", "", "comma-separated EC volume IDs to process (optional)")
|
||||||
// TODO: switch default mode to LOCAL, once implemented.
|
mode := volScrubCommand.String("mode", "local", "scrubbing mode (index/local/full)")
|
||||||
mode := volScrubCommand.String("mode", "index", "scrubbing mode (index/local/full)")
|
|
||||||
maxParallelization := volScrubCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible")
|
maxParallelization := volScrubCommand.Int("maxParallelization", DefaultMaxParallelization, "run up to X tasks in parallel, whenever possible")
|
||||||
|
|
||||||
if err = volScrubCommand.Parse(args); err != nil {
|
if err = volScrubCommand.Parse(args); err != nil {
|
||||||
|
|||||||
@@ -339,14 +339,3 @@ func (ev *EcVolume) WalkIndex(processNeedleFn func(key types.NeedleId, offset ty
|
|||||||
}
|
}
|
||||||
return idx.WalkIndexFile(ev.ecxFile, 0, processNeedleFn)
|
return idx.WalkIndexFile(ev.ecxFile, 0, processNeedleFn)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ev *EcVolume) CheckIndex() (int64, []error) {
|
|
||||||
if ev.ecxFile == nil {
|
|
||||||
return 0, []error{fmt.Errorf("no ECX file associated with EC volume %v", ev.VolumeId)}
|
|
||||||
}
|
|
||||||
if ev.ecxFileSize == 0 {
|
|
||||||
return 0, []error{fmt.Errorf("zero-size ECX file for EC volume %v", ev.VolumeId)}
|
|
||||||
}
|
|
||||||
|
|
||||||
return idx.CheckIndexFile(ev.ecxFile, ev.ecxFileSize, ev.Version)
|
|
||||||
}
|
|
||||||
|
|||||||
103
weed/storage/erasure_coding/ec_volume_scrub.go
Normal file
103
weed/storage/erasure_coding/ec_volume_scrub.go
Normal file
@@ -0,0 +1,103 @@
|
|||||||
|
package erasure_coding
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"slices"
|
||||||
|
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/storage/idx"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
|
||||||
|
"github.com/seaweedfs/seaweedfs/weed/storage/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ScrubIndex verifies index integrity of an EC volume.
|
||||||
|
func (ev *EcVolume) ScrubIndex() (int64, []error) {
|
||||||
|
if ev.ecxFile == nil {
|
||||||
|
return 0, []error{fmt.Errorf("no ECX file associated with EC volume %v", ev.VolumeId)}
|
||||||
|
}
|
||||||
|
if ev.ecxFileSize == 0 {
|
||||||
|
return 0, []error{fmt.Errorf("zero-size ECX file for EC volume %v", ev.VolumeId)}
|
||||||
|
}
|
||||||
|
|
||||||
|
return idx.CheckIndexFile(ev.ecxFile, ev.ecxFileSize, ev.Version)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ScrubLocal checks the integrity of local shards for a EC volume. Notably, it cannot verify CRC on needles.
|
||||||
|
// Returns a count of processed file entries, slice of found broken shards, and slice of found errors.
|
||||||
|
func (ecv *EcVolume) ScrubLocal() (int64, []*volume_server_pb.EcShardInfo, []error) {
|
||||||
|
// local scan means verifying indexes as well
|
||||||
|
_, errs := ecv.ScrubIndex()
|
||||||
|
|
||||||
|
brokenShardsMap := map[ShardId]*EcVolumeShard{}
|
||||||
|
var count int64
|
||||||
|
|
||||||
|
flagShardBroken := func(ecs *EcVolumeShard, errFmt string, a ...any) {
|
||||||
|
// reads for EC chunks can hit the same shard multiple times, so dedupe upon read errors
|
||||||
|
brokenShardsMap[ecs.ShardId] = ecs
|
||||||
|
errs = append(errs, fmt.Errorf(errFmt, a...))
|
||||||
|
}
|
||||||
|
|
||||||
|
err := idx.WalkIndexFile(ecv.ecxFile, 0, func(id types.NeedleId, offset types.Offset, size types.Size) error {
|
||||||
|
count++
|
||||||
|
if size.IsTombstone() {
|
||||||
|
// nothing to do for tombstones...
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var read int64
|
||||||
|
locations := ecv.LocateEcShardNeedleInterval(ecv.Version, offset.ToActualOffset(), size)
|
||||||
|
|
||||||
|
for i, iv := range locations {
|
||||||
|
sid, soffset := iv.ToShardIdAndOffset(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize)
|
||||||
|
ssize := int64(iv.Size.Raw())
|
||||||
|
shard, found := ecv.FindEcVolumeShard(sid)
|
||||||
|
if !found {
|
||||||
|
// shard is not local :( skip it
|
||||||
|
read += ssize
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if soffset+int64(ssize) > shard.Size() {
|
||||||
|
flagShardBroken(shard, "local shard %d for needle %d is too short (%d), cannot read chunk %d/%d", sid, id, shard.Size(), i+1, len(locations))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
buf := make([]byte, ssize)
|
||||||
|
got, err := shard.ReadAt(buf, soffset)
|
||||||
|
if err != nil {
|
||||||
|
flagShardBroken(shard, "failed to read chunk %d/%d for needle %d from local shard %d at offset %d: %v", i+1, len(locations), id, sid, soffset, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if int64(got) != ssize {
|
||||||
|
flagShardBroken(shard, "expected %d bytes for chunk %d/%d for needle %d from local shard %d, got %d", ssize, i+1, len(locations), id, sid, got)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
read += int64(got)
|
||||||
|
}
|
||||||
|
|
||||||
|
if got, want := read, needle.GetActualSize(size, ecv.Version); got != want {
|
||||||
|
return fmt.Errorf("expected %d bytes for needle %d, got %d", want, id, got)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
errs = append(errs, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// collect broken shard infos for reporting
|
||||||
|
brokenShards := make([]*volume_server_pb.EcShardInfo, 0, len(brokenShardsMap))
|
||||||
|
for _, s := range brokenShardsMap {
|
||||||
|
brokenShards = append(brokenShards, s.ToEcShardInfo())
|
||||||
|
}
|
||||||
|
slices.SortFunc(brokenShards, func(a, b *volume_server_pb.EcShardInfo) int {
|
||||||
|
if a.ShardId < b.ShardId {
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
if a.ShardId > b.ShardId {
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
})
|
||||||
|
|
||||||
|
return count, brokenShards, errs
|
||||||
|
}
|
||||||
@@ -13,7 +13,7 @@ import (
|
|||||||
|
|
||||||
func TestPositioning(t *testing.T) {
|
func TestPositioning(t *testing.T) {
|
||||||
|
|
||||||
ecxFile, err := os.OpenFile("../idx/test_files/389.ecx", os.O_RDONLY, 0)
|
ecxFile, err := os.OpenFile("./test_files/389.ecx", os.O_RDONLY, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("failed to open ecx file: %v", err)
|
t.Errorf("failed to open ecx file: %v", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -50,13 +50,6 @@ func TestCheckIndexFile(t *testing.T) {
|
|||||||
fmt.Errorf("expected an index file of size 2540, got 2528"),
|
fmt.Errorf("expected an index file of size 2540, got 2528"),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
|
||||||
name: "healthy EC index",
|
|
||||||
indexPath: "./test_files/389.ecx",
|
|
||||||
version: needle.Version3,
|
|
||||||
want: 485098,
|
|
||||||
wantErrs: []error{},
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
name: "healthy EC index with deleted files",
|
name: "healthy EC index with deleted files",
|
||||||
indexPath: "./test_files/deleted_files.ecx",
|
indexPath: "./test_files/deleted_files.ecx",
|
||||||
|
|||||||
@@ -14,6 +14,10 @@ type Offset struct {
|
|||||||
|
|
||||||
type Size int32
|
type Size int32
|
||||||
|
|
||||||
|
func (s Size) IsTombstone() bool {
|
||||||
|
return s == TombstoneFileSize
|
||||||
|
}
|
||||||
|
|
||||||
// IsDeleted checks if the needle entry has been marked as deleted (tombstoned).
|
// IsDeleted checks if the needle entry has been marked as deleted (tombstoned).
|
||||||
// Use this when checking if an entry should exist in the needle map.
|
// Use this when checking if an entry should exist in the needle map.
|
||||||
// Returns true for negative sizes or TombstoneFileSize.
|
// Returns true for negative sizes or TombstoneFileSize.
|
||||||
|
|||||||
Reference in New Issue
Block a user