Fix volume.fsck -forcePurging -reallyDeleteFromVolume to fail fast on filer traversal errors (#8015)

* Add TraverseBfsWithContext and fix race conditions in error handling

- Add TraverseBfsWithContext function to support context cancellation
- Fix race condition in doTraverseBfsAndSaving using atomic.Bool and sync.Once
- Improve error handling with fail-fast behavior and proper error propagation
- Update command_volume_fsck to use error-returning saveFn callback
- Enhance error messages in readFilerFileIdFile with detailed context

* refactoring

* fix error format

* atomic

* filer_pb: make enqueue return void

* shell: simplify fs.meta.save error handling

* filer_pb: handle enqueue return value

* Revert "atomic"

This reverts commit 712648bc354b186d6654fdb8a46fd4848fdc4e00.

* shell: refine fs.meta.save logic

---------

Co-authored-by: Chris Lu <chris.lu@gmail.com>
This commit is contained in:
Jaehoon Kim
2026-01-15 14:37:50 +09:00
committed by GitHub
parent 691aea84c3
commit f2e7af257d
8 changed files with 144 additions and 72 deletions

View File

@@ -109,9 +109,9 @@ func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer
defer util_http.GetGlobalHttpClient().CloseIdleConnections()
return commandEnv.WithFilerClient(false, func(filerClient filer_pb.SeaweedFilerClient) error {
return filer_pb.TraverseBfs(commandEnv, util.FullPath(dir), func(parentPath util.FullPath, entry *filer_pb.Entry) {
return filer_pb.TraverseBfs(context.Background(), commandEnv, util.FullPath(dir), func(parentPath util.FullPath, entry *filer_pb.Entry) error {
if entry.IsDirectory {
return
return nil
}
for _, chunk := range entry.Chunks {
chunkVolumeId := needle.VolumeId(chunk.Fid.VolumeId)
@@ -141,6 +141,7 @@ func (c *commandFsMergeVolumes) Do(args []string, commandEnv *CommandEnv, writer
fmt.Printf("failed to update %s: %v\n", path, err)
}
}
return nil
})
})
}

View File

@@ -75,13 +75,13 @@ func (c *commandFsMetaChangeVolumeId) Do(args []string, commandEnv *CommandEnv,
}
return commandEnv.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
return filer_pb.TraverseBfs(commandEnv, util.FullPath(*dir), func(parentPath util.FullPath, entry *filer_pb.Entry) {
return filer_pb.TraverseBfs(context.Background(), commandEnv, util.FullPath(*dir), func(parentPath util.FullPath, entry *filer_pb.Entry) error {
if !entry.IsDirectory {
var hasChanges bool
for _, chunk := range entry.Chunks {
if chunk.IsChunkManifest {
fmt.Printf("Change volume id for large file is not implemented yet: %s/%s\n", parentPath, entry.Name)
return
return nil
}
chunkVolumeId := chunk.Fid.VolumeId
if toVolumeId, found := mapping[needle.VolumeId(chunkVolumeId)]; found {
@@ -102,6 +102,7 @@ func (c *commandFsMetaChangeVolumeId) Do(args []string, commandEnv *CommandEnv,
}
}
}
return nil
})
})
}

View File

@@ -1,6 +1,7 @@
package shell
import (
"context"
"fmt"
"io"
@@ -51,7 +52,7 @@ func (c *commandFsMetaNotify) Do(args []string, commandEnv *CommandEnv, writer i
var dirCount, fileCount uint64
err = filer_pb.TraverseBfs(commandEnv, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) {
err = filer_pb.TraverseBfs(context.Background(), commandEnv, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) error {
if entry.IsDirectory {
dirCount++
@@ -69,7 +70,7 @@ func (c *commandFsMetaNotify) Do(args []string, commandEnv *CommandEnv, writer i
if notifyErr != nil {
fmt.Fprintf(writer, "fail to notify new entry event for %s: %v\n", parentPath.Child(entry.Name), notifyErr)
}
return nil
})
if err == nil {

View File

@@ -118,14 +118,21 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.
outputChan <- bytes
return nil
}, func(outputChan chan interface{}) {
}, func(outputChan chan interface{}) error {
sizeBuf := make([]byte, 4)
for item := range outputChan {
b := item.([]byte)
util.Uint32toBytes(sizeBuf, uint32(len(b)))
dst.Write(sizeBuf)
dst.Write(b)
_, err := dst.Write(sizeBuf)
if err != nil {
return err
}
_, err = dst.Write(b)
if err != nil {
return err
}
}
return nil
})
if err == nil {
@@ -136,17 +143,21 @@ func (c *commandFsMetaSave) Do(args []string, commandEnv *CommandEnv, writer io.
}
func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, path string, verbose bool, genFn func(entry *filer_pb.FullEntry, outputChan chan interface{}) error, saveFn func(outputChan chan interface{})) error {
func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer, path string, verbose bool, genFn func(entry *filer_pb.FullEntry, outputChan chan interface{}) error, saveFn func(outputChan chan interface{}) error) error {
var wg sync.WaitGroup
wg.Add(1)
outputChan := make(chan interface{}, 1024)
saveErrChan := make(chan error, 1)
go func() {
saveFn(outputChan)
saveErrChan <- saveFn(outputChan)
wg.Done()
}()
var dirCount, fileCount uint64
var once sync.Once
var firstErr error
var hasErr atomic.Bool
// also save the directory itself (path) if it exists in the filer
if e, getErr := filer_pb.GetEntry(context.Background(), filerClient, util.FullPath(path)); getErr != nil {
@@ -160,8 +171,13 @@ func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer,
Dir: parentDir,
Entry: e,
}
if genErr := genFn(protoMessage, outputChan); genErr != nil {
fmt.Fprintf(writer, "marshall error: %v\n", genErr)
once.Do(func() {
firstErr = genErr
hasErr.Store(true)
})
return genErr
} else {
if e.IsDirectory {
atomic.AddUint64(&dirCount, 1)
@@ -171,10 +187,13 @@ func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer,
}
}
err := filer_pb.TraverseBfs(filerClient, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := filer_pb.TraverseBfs(ctx, filerClient, util.FullPath(path), func(parentPath util.FullPath, entry *filer_pb.Entry) error {
if strings.HasPrefix(string(parentPath), filer.SystemLogDir) {
return
parent := string(parentPath)
if parent == filer.SystemLogDir || strings.HasPrefix(parent, filer.SystemLogDir+"/") {
return nil
}
protoMessage := &filer_pb.FullEntry{
@@ -182,9 +201,17 @@ func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer,
Entry: entry,
}
if err := genFn(protoMessage, outputChan); err != nil {
fmt.Fprintf(writer, "marshall error: %v\n", err)
return
if hasErr.Load() {
// fail-fast: stop traversal once an error is observed.
return firstErr
}
if genErr := genFn(protoMessage, outputChan); genErr != nil {
once.Do(func() {
firstErr = genErr
hasErr.Store(true)
cancel()
})
return genErr
}
if entry.IsDirectory {
@@ -197,14 +224,23 @@ func doTraverseBfsAndSaving(filerClient filer_pb.FilerClient, writer io.Writer,
println(parentPath.Child(entry.Name))
}
return nil
})
close(outputChan)
wg.Wait()
saveErr := <-saveErrChan
if err == nil && writer != nil {
if err != nil {
return err
}
if saveErr != nil {
return saveErr
}
if writer != nil {
fmt.Fprintf(writer, "total %d directories, %d files\n", dirCount, fileCount)
}
return err
return nil
}

View File

@@ -300,7 +300,7 @@ func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount uint64, errC
}
return nil
},
func(outputChan chan interface{}) {
func(outputChan chan interface{}) error {
var wg sync.WaitGroup
itemErrCount := atomic.NewUint64(0)
for itemEntry := range outputChan {
@@ -315,5 +315,6 @@ func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount uint64, errC
}
wg.Wait()
errCount = itemErrCount.Load()
return nil
})
}

View File

@@ -266,7 +266,7 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo m
}
return nil
},
func(outputChan chan interface{}) {
func(outputChan chan interface{}) error {
buffer := make([]byte, readbufferSize)
for item := range outputChan {
i := item.(*Item)
@@ -274,8 +274,12 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo m
util.Uint64toBytes(buffer, i.fileKey)
util.Uint32toBytes(buffer[8:], i.cookie)
util.Uint32toBytes(buffer[12:], uint32(len(i.path)))
f.Write(buffer)
f.Write([]byte(i.path))
if _, err := f.Write(buffer); err != nil {
return err
}
if _, err := f.Write([]byte(i.path)); err != nil {
return err
}
} else if *c.findMissingChunksInFiler && len(c.volumeIds) == 0 {
fmt.Fprintf(c.writer, "%d,%x%08x %s volume not found\n", i.vid, i.fileKey, i.cookie, i.path)
if purgeAbsent {
@@ -284,6 +288,7 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo m
}
}
}
return nil
})
}
@@ -482,10 +487,10 @@ func (c *commandVolumeFsck) readFilerFileIdFile(volumeId uint32, fn func(needleI
break
}
if readErr != nil {
return readErr
return fmt.Errorf("read fid header for volume %d: %w", volumeId, readErr)
}
if readSize != readbufferSize {
return fmt.Errorf("readSize mismatch")
return fmt.Errorf("read fid header size mismatch for volume %d: got %d want %d", volumeId, readSize, readbufferSize)
}
item.fileKey = util.BytesToUint64(buffer[:8])
item.cookie = util.BytesToUint32(buffer[8:12])
@@ -493,10 +498,10 @@ func (c *commandVolumeFsck) readFilerFileIdFile(volumeId uint32, fn func(needleI
pathBytes := make([]byte, int(pathSize))
n, err := io.ReadFull(br, pathBytes)
if err != nil {
fmt.Fprintf(c.writer, "%d,%x%08x in unexpected error: %v\n", volumeId, item.fileKey, item.cookie, err)
return fmt.Errorf("read fid path for volume %d,%x%08x: %w", volumeId, item.fileKey, item.cookie, err)
}
if n != int(pathSize) {
fmt.Fprintf(c.writer, "%d,%x%08x %d unexpected file name size %d\n", volumeId, item.fileKey, item.cookie, pathSize, n)
return fmt.Errorf("read fid path size mismatch for volume %d,%x%08x: got %d want %d", volumeId, item.fileKey, item.cookie, n, pathSize)
}
item.path = util.FullPath(pathBytes)
needleId := types.NeedleId(item.fileKey)
@@ -748,10 +753,10 @@ func writeToFile(bytes []byte, fileName string) error {
flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
dst, err := os.OpenFile(fileName, flags, 0644)
if err != nil {
return nil
return err
}
defer dst.Close()
dst.Write(bytes)
return nil
_, err = dst.Write(bytes)
return err
}