Files
seaweedFS/weed/storage/store_vacuum.go
Chris Lu af68449a26 Process .ecj deletions during EC decode and vacuum decoded volume (#8863)
* Process .ecj deletions during EC decode and vacuum decoded volume (#8798)

When decoding EC volumes back to normal volumes, deletions recorded in
the .ecj journal were not being applied before computing the dat file
size or checking for live needles. This caused the decoded volume to
include data for deleted files and could produce false positives in the
all-deleted check.

- Call RebuildEcxFile before HasLiveNeedles/FindDatFileSize in
  VolumeEcShardsToVolume so .ecj deletions are merged into .ecx first
- Vacuum the decoded volume after mounting in ec.decode to compact out
  deleted needle data from the .dat file
- Add integration tests for decoding with non-empty .ecj files

* storage: add offline volume compaction helper

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* ec: compact decoded volumes before deleting shards

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* ec: address PR review comments

- Fall back to data directory for .ecx when idx directory lacks it
- Make compaction failure non-fatal during EC decode
- Remove misleading "buffer: 10%" from space check error message

* ec: collect .ecj from all shard locations during decode

Each server's .ecj only contains deletions for needles whose data
resides in shards held by that server. Previously, sources with no
new data shards to contribute were skipped entirely, losing their
.ecj deletion entries. Now .ecj is always appended from every shard
location so RebuildEcxFile sees the full set of deletions.

* ec: add integration tests for .ecj collection during decode

TestEcDecodePreservesDeletedNeedles: verifies that needles deleted
via VolumeEcBlobDelete are excluded from the decoded volume.

TestEcDecodeCollectsEcjFromPeer: regression test for the fix in
collectEcShards. Deletes a needle only on a peer server that holds
no new data shards, then verifies the deletion survives decode via
.ecj collection.

* ec: address review nits in decode and tests

- Remove double error wrapping in mountDecodedVolume
- Check VolumeUnmount error in peer ecj test
- Assert 404 specifically for deleted needles, fail on 5xx

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-04-01 01:15:26 -07:00

124 lines
4.1 KiB
Go

package storage
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
)
func (s *Store) CheckCompactVolume(volumeId needle.VolumeId) (float64, error) {
if v := s.findVolume(volumeId); v != nil {
glog.V(3).Infof("volume %d garbage level: %f", volumeId, v.garbageLevel())
return v.garbageLevel(), nil
}
return 0, fmt.Errorf("volume id %d is not found during check compact", volumeId)
}
func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compactionBytePerSecond int64, progressFn ProgressFunc) error {
if v := s.findVolume(vid); v != nil {
if err := ensureCompactVolumeSpace(v, preallocate); err != nil {
return err
}
return v.CompactByIndex(&CompactOptions{
PreallocateBytes: preallocate,
MaxBytesPerSecond: compactionBytePerSecond,
ProgressCallback: progressFn,
})
}
return fmt.Errorf("volume id %d is not found during compact", vid)
}
func (s *Store) CommitCompactVolume(vid needle.VolumeId) (bool, int64, error) {
if s.isStopping {
return false, 0, fmt.Errorf("volume id %d skips compact because volume is stopping", vid)
}
if v := s.findVolume(vid); v != nil {
isReadOnly := v.IsReadOnly()
err := v.CommitCompact()
var volumeSize int64 = 0
if err == nil && v.DataBackend != nil {
volumeSize, _, _ = v.DataBackend.GetStat()
}
return isReadOnly, volumeSize, err
}
return false, 0, fmt.Errorf("volume id %d is not found during commit compact", vid)
}
func (s *Store) CommitCleanupVolume(vid needle.VolumeId) error {
if v := s.findVolume(vid); v != nil {
return v.cleanupCompact()
}
return fmt.Errorf("volume id %d is not found during cleaning up", vid)
}
func ensureCompactVolumeSpace(v *Volume, preallocate int64) error {
// Get current volume size for space calculation
volumeSize, indexSize, _ := v.FileStat()
// Calculate space needed for compaction:
// 1. Space for the new compacted volume (approximately same as current volume size)
// 2. Use the larger of preallocate or estimated volume size
estimatedCompactSize := int64(volumeSize + indexSize)
spaceNeeded := preallocate
if estimatedCompactSize > preallocate {
spaceNeeded = estimatedCompactSize
}
diskStatus := stats.NewDiskStatus(v.dir)
if int64(diskStatus.Free) < spaceNeeded {
return fmt.Errorf("insufficient free space for compaction: need %d bytes (volume: %d, index: %d), but only %d bytes available",
spaceNeeded, volumeSize, indexSize, diskStatus.Free)
}
glog.V(1).Infof("volume %d compaction space check: volume=%d, index=%d, space_needed=%d, free_space=%d",
v.Id, volumeSize, indexSize, spaceNeeded, diskStatus.Free)
return nil
}
func (s *Store) CompactVolumeFiles(vid needle.VolumeId, collection string, location *DiskLocation, needleMapKind NeedleMapKind, ldbTimeout int64, preallocate int64, compactionBytePerSecond int64) (err error) {
if location == nil {
return fmt.Errorf("volume %d compaction location is nil", vid)
}
tempVolume, err := loadVolumeWithoutWorker(location.Directory, location.IdxDirectory, collection, vid, needleMapKind, ldbTimeout)
if err != nil {
return fmt.Errorf("load volume %d for offline compaction: %w", vid, err)
}
tempVolume.location = location
defer func() {
if tempVolume.tmpNm != nil {
tempVolume.tmpNm.Close()
tempVolume.tmpNm = nil
}
tempVolume.doClose()
}()
if err := ensureCompactVolumeSpace(tempVolume, preallocate); err != nil {
return err
}
if err := tempVolume.CompactByIndex(&CompactOptions{
PreallocateBytes: preallocate,
MaxBytesPerSecond: compactionBytePerSecond,
}); err != nil {
if cleanupErr := tempVolume.cleanupCompact(); cleanupErr != nil {
return fmt.Errorf("compact volume %d: %v (cleanup failed: %v)", vid, err, cleanupErr)
}
return fmt.Errorf("compact volume %d: %w", vid, err)
}
if err := tempVolume.CommitCompact(); err != nil {
if cleanupErr := tempVolume.cleanupCompact(); cleanupErr != nil {
return fmt.Errorf("commit compact volume %d: %v (cleanup failed: %v)", vid, err, cleanupErr)
}
return fmt.Errorf("commit compact volume %d: %w", vid, err)
}
return nil
}