Fix disk errors handling in vacuum compaction (#8244)
When a disk reports IO errors during vacuum compaction (e.g., 'read /mnt/d1/weed/oc_xyz.dat: input/output error'), the vacuum task should signal the error to the master so it can: 1. Drop the faulty volume replica 2. Rebuild the replica from healthy copies Changes: - Add checkReadWriteError() calls in vacuum read paths (ReadNeedleBlob, ReadData, ScanVolumeFile) to flag EIO errors in volume.lastIoError - Preserve error wrapping using %w format instead of %v so EIO propagates correctly - The existing heartbeat logic will detect lastIoError and remove the bad volume Fixes issue #8237
This commit is contained in:
@@ -219,10 +219,10 @@ func ScanVolumeFile(dirname string, collection string, id needle.VolumeId,
|
|||||||
volumeFileScanner VolumeFileScanner) (err error) {
|
volumeFileScanner VolumeFileScanner) (err error) {
|
||||||
var v *Volume
|
var v *Volume
|
||||||
if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind, needle.GetCurrentVersion()); err != nil {
|
if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind, needle.GetCurrentVersion()); err != nil {
|
||||||
return fmt.Errorf("failed to load volume %d: %v", id, err)
|
return fmt.Errorf("failed to load volume %d: %w", id, err)
|
||||||
}
|
}
|
||||||
if err = volumeFileScanner.VisitSuperBlock(v.SuperBlock); err != nil {
|
if err = volumeFileScanner.VisitSuperBlock(v.SuperBlock); err != nil {
|
||||||
return fmt.Errorf("failed to process volume %d super block: %v", id, err)
|
return fmt.Errorf("failed to process volume %d super block: %w", id, err)
|
||||||
}
|
}
|
||||||
defer v.Close()
|
defer v.Close()
|
||||||
|
|
||||||
@@ -239,7 +239,7 @@ func ScanVolumeFileFrom(version needle.Version, datBackend backend.BackendStorag
|
|||||||
if e == io.EOF {
|
if e == io.EOF {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return fmt.Errorf("cannot read %s at offset %d: %v", datBackend.Name(), offset, e)
|
return fmt.Errorf("cannot read %s at offset %d: %w", datBackend.Name(), offset, e)
|
||||||
}
|
}
|
||||||
for n != nil {
|
for n != nil {
|
||||||
var needleBody []byte
|
var needleBody []byte
|
||||||
|
|||||||
@@ -326,7 +326,8 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
|
|||||||
var needleBytes []byte
|
var needleBytes []byte
|
||||||
needleBytes, err = needle.ReadNeedleBlob(oldDatBackend, increIdxEntry.offset.ToActualOffset(), increIdxEntry.size, v.Version())
|
needleBytes, err = needle.ReadNeedleBlob(oldDatBackend, increIdxEntry.offset.ToActualOffset(), increIdxEntry.size, v.Version())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %v", oldDatFile.Name(), key, increIdxEntry.offset.ToActualOffset(), increIdxEntry.size, err)
|
v.checkReadWriteError(err)
|
||||||
|
return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %w", oldDatFile.Name(), key, increIdxEntry.offset.ToActualOffset(), increIdxEntry.size, err)
|
||||||
}
|
}
|
||||||
dstDatBackend.Write(needleBytes)
|
dstDatBackend.Write(needleBytes)
|
||||||
if err := dstDatBackend.Sync(); err != nil {
|
if err := dstDatBackend.Sync(); err != nil {
|
||||||
@@ -421,6 +422,7 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
|
|||||||
}
|
}
|
||||||
err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind, scanner)
|
err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind, scanner)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
v.checkReadWriteError(err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -476,7 +478,8 @@ func (v *Volume) copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, da
|
|||||||
|
|
||||||
n := new(needle.Needle)
|
n := new(needle.Needle)
|
||||||
if err := n.ReadData(srcDatBackend, offset.ToActualOffset(), size, version); err != nil {
|
if err := n.ReadData(srcDatBackend, offset.ToActualOffset(), size, version); err != nil {
|
||||||
return fmt.Errorf("cannot hydrate needle from file: %s", err)
|
v.checkReadWriteError(err)
|
||||||
|
return fmt.Errorf("cannot hydrate needle from file: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if n.HasTtl() && now >= n.LastModified+uint64(sb.Ttl.Minutes()*60) {
|
if n.HasTtl() && now >= n.LastModified+uint64(sb.Ttl.Minutes()*60) {
|
||||||
|
|||||||
Reference in New Issue
Block a user