shell: update fs.verify and volume.fsck for new BFS signature
Updated dependent commands to match the refactored doTraverseBfsAndSaving signature and use context for channel sends.
This commit is contained in:
@@ -281,7 +281,7 @@ func (c *commandFsVerify) verifyEntry(path string, chunks []*filer_pb.FileChunk,
|
|||||||
func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount uint64, errCount uint64, err error) {
|
func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount uint64, errCount uint64, err error) {
|
||||||
timeNowAtSec := time.Now().Unix()
|
timeNowAtSec := time.Now().Unix()
|
||||||
return fileCount, errCount, doTraverseBfsAndSaving(c.env, c.writer, path, false,
|
return fileCount, errCount, doTraverseBfsAndSaving(c.env, c.writer, path, false,
|
||||||
func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
|
func(ctx context.Context, entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
|
||||||
if c.modifyTimeAgoAtSec > 0 {
|
if c.modifyTimeAgoAtSec > 0 {
|
||||||
if entry.Entry.Attributes != nil && c.modifyTimeAgoAtSec < timeNowAtSec-entry.Entry.Attributes.Mtime {
|
if entry.Entry.Attributes != nil && c.modifyTimeAgoAtSec < timeNowAtSec-entry.Entry.Attributes.Mtime {
|
||||||
return nil
|
return nil
|
||||||
@@ -293,9 +293,13 @@ func (c *commandFsVerify) verifyTraverseBfs(path string) (fileCount uint64, errC
|
|||||||
}
|
}
|
||||||
dataChunks = append(dataChunks, manifestChunks...)
|
dataChunks = append(dataChunks, manifestChunks...)
|
||||||
if len(dataChunks) > 0 {
|
if len(dataChunks) > 0 {
|
||||||
outputChan <- &ItemEntry{
|
select {
|
||||||
|
case outputChan <- &ItemEntry{
|
||||||
chunks: dataChunks,
|
chunks: dataChunks,
|
||||||
path: util.NewFullPath(entry.Dir, entry.Entry.Name),
|
path: util.NewFullPath(entry.Dir, entry.Entry.Name),
|
||||||
|
}:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -241,7 +241,7 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo m
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
return doTraverseBfsAndSaving(c.env, c.writer, c.getCollectFilerFilePath(), false,
|
return doTraverseBfsAndSaving(c.env, c.writer, c.getCollectFilerFilePath(), false,
|
||||||
func(entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
|
func(ctx context.Context, entry *filer_pb.FullEntry, outputChan chan interface{}) (err error) {
|
||||||
if *c.verbose && entry.Entry.IsDirectory {
|
if *c.verbose && entry.Entry.IsDirectory {
|
||||||
fmt.Fprintf(c.writer, "checking directory %s\n", util.NewFullPath(entry.Dir, entry.Entry.Name))
|
fmt.Fprintf(c.writer, "checking directory %s\n", util.NewFullPath(entry.Dir, entry.Entry.Name))
|
||||||
}
|
}
|
||||||
@@ -257,11 +257,15 @@ func (c *commandVolumeFsck) collectFilerFileIdAndPaths(dataNodeVolumeIdToVInfo m
|
|||||||
if collectModifyFromAtNs != 0 && chunk.ModifiedTsNs < collectModifyFromAtNs {
|
if collectModifyFromAtNs != 0 && chunk.ModifiedTsNs < collectModifyFromAtNs {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
outputChan <- &Item{
|
select {
|
||||||
|
case outputChan <- &Item{
|
||||||
vid: chunk.Fid.VolumeId,
|
vid: chunk.Fid.VolumeId,
|
||||||
fileKey: chunk.Fid.FileKey,
|
fileKey: chunk.Fid.FileKey,
|
||||||
cookie: chunk.Fid.Cookie,
|
cookie: chunk.Fid.Cookie,
|
||||||
path: util.NewFullPath(entry.Dir, entry.Entry.Name),
|
path: util.NewFullPath(entry.Dir, entry.Entry.Name),
|
||||||
|
}:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
Reference in New Issue
Block a user