Implement full scrubbing for EC volumes (#8318)
Implement full scrubbing for EC volumes.
This commit is contained in:
@@ -159,7 +159,7 @@ func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle, onReadS
|
||||
if len(intervals) > 1 {
|
||||
glog.V(3).Infof("ReadEcShardNeedle needle id %s intervals:%+v", n.String(), intervals)
|
||||
}
|
||||
bytes, isDeleted, err := s.readEcShardIntervals(vid, n.Id, localEcVolume, intervals)
|
||||
bytes, isDeleted, err := s.readEcShardIntervals(n.Id, localEcVolume, intervals)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("ReadEcShardIntervals: %w", err)
|
||||
}
|
||||
@@ -178,8 +178,11 @@ func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle, onReadS
|
||||
return 0, fmt.Errorf("ec shard %d not found", vid)
|
||||
}
|
||||
|
||||
func (s *Store) readEcShardIntervals(vid needle.VolumeId, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
|
||||
func (s *Store) IntervalToShardIdAndOffset(iv erasure_coding.Interval) (erasure_coding.ShardId, int64) {
|
||||
return iv.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
|
||||
}
|
||||
|
||||
func (s *Store) readEcShardIntervals(needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
|
||||
if err = s.cachedLookupEcShardLocations(ecVolume); err != nil {
|
||||
return nil, false, fmt.Errorf("failed to locate shard via master grpc %s: %v", s.MasterAddress, err)
|
||||
}
|
||||
@@ -202,37 +205,36 @@ func (s *Store) readEcShardIntervals(vid needle.VolumeId, needleId types.NeedleI
|
||||
}
|
||||
|
||||
func (s *Store) readOneEcShardInterval(needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, interval erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
|
||||
shardId, actualOffset := interval.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
|
||||
shardId, actualOffset := s.IntervalToShardIdAndOffset(interval)
|
||||
data = make([]byte, interval.Size)
|
||||
if shard, found := ecVolume.FindEcVolumeShard(shardId); found {
|
||||
var readSize int
|
||||
if readSize, err = shard.ReadAt(data, actualOffset); err != nil {
|
||||
if readSize != int(interval.Size) {
|
||||
glog.V(0).Infof("read local ec shard %d.%d offset %d: %v", ecVolume.VolumeId, shardId, actualOffset, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ecVolume.ShardLocationsLock.RLock()
|
||||
sourceDataNodes, hasShardIdLocation := ecVolume.ShardLocations[shardId]
|
||||
ecVolume.ShardLocationsLock.RUnlock()
|
||||
|
||||
// try reading directly
|
||||
if hasShardIdLocation {
|
||||
_, is_deleted, err = s.readRemoteEcShardInterval(sourceDataNodes, needleId, ecVolume.VolumeId, shardId, data, actualOffset)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
glog.V(0).Infof("clearing ec shard %d.%d locations: %v", ecVolume.VolumeId, shardId, err)
|
||||
}
|
||||
// try local read
|
||||
err = s.readLocalEcShardInterval(ecVolume, shardId, data, actualOffset)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
glog.V(0).Infof("read local ec shard %d.%d offset %d: %v", ecVolume.VolumeId, shardId, actualOffset, err)
|
||||
|
||||
// try reading by recovering from other shards
|
||||
_, is_deleted, err = s.recoverOneRemoteEcShardInterval(needleId, ecVolume, shardId, data, actualOffset)
|
||||
ecVolume.ShardLocationsLock.RLock()
|
||||
sourceDataNodes, hasShardIdLocation := ecVolume.ShardLocations[shardId]
|
||||
ecVolume.ShardLocationsLock.RUnlock()
|
||||
|
||||
// try reading directly
|
||||
if hasShardIdLocation {
|
||||
_, is_deleted, err = s.readRemoteEcShardInterval(sourceDataNodes, needleId, ecVolume.VolumeId, shardId, data, actualOffset)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
glog.V(0).Infof("recover ec shard %d.%d : %v", ecVolume.VolumeId, shardId, err)
|
||||
glog.V(0).Infof("read remote ec shard %d.%d locations: %v", ecVolume.VolumeId, shardId, err)
|
||||
}
|
||||
|
||||
// try reading by recovering from other shards
|
||||
_, is_deleted, err = s.recoverOneRemoteEcShardInterval(needleId, ecVolume, shardId, data, actualOffset)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
glog.V(0).Infof("recover ec shard %d.%d : %v", ecVolume.VolumeId, shardId, err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@@ -286,6 +288,23 @@ func (s *Store) cachedLookupEcShardLocations(ecVolume *erasure_coding.EcVolume)
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Store) readLocalEcShardInterval(ecVolume *erasure_coding.EcVolume, shardId erasure_coding.ShardId, buf []byte, offset int64) error {
|
||||
shard, found := ecVolume.FindEcVolumeShard(shardId)
|
||||
if !found {
|
||||
return fmt.Errorf("shard %d not found for volume %d", shardId, ecVolume.VolumeId)
|
||||
}
|
||||
|
||||
readBytes, err := shard.ReadAt(buf, offset)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read local EC shard %d for volume %d: %v", shardId, ecVolume.VolumeId, err)
|
||||
}
|
||||
if got, want := readBytes, len(buf); got != want {
|
||||
return fmt.Errorf("expected %d bytes for local EC shard %d on volume %d, got %d", want, shardId, ecVolume.VolumeId, got)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) readRemoteEcShardInterval(sourceDataNodes []pb.ServerAddress, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
|
||||
|
||||
if len(sourceDataNodes) == 0 {
|
||||
|
||||
Reference in New Issue
Block a user