This commit is contained in:
Chris Lu
2021-08-29 19:13:48 -07:00
parent 6deee4c0b9
commit eacaa44dc2
2 changed files with 5 additions and 6 deletions

View File

@@ -66,7 +66,7 @@ var _ = remote_storage.RemoteStorageClient(&hdfsRemoteStorageClient{})
func (c *hdfsRemoteStorageClient) Traverse(loc *remote_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) {
return TraverseBfs(func(parentDir util.FullPath, visitFn remote_storage.VisitFunc) error {
return remote_storage.TraverseBfs(func(parentDir util.FullPath, visitFn remote_storage.VisitFunc) error {
children, err := c.client.ReadDir(string(parentDir))
if err != nil {
return err

View File

@@ -1,63 +0,0 @@
package hdfs
import (
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/remote_storage"
"github.com/chrislusf/seaweedfs/weed/util"
"sync"
"time"
)
type ListDirectoryFunc func(parentDir util.FullPath, visitFn remote_storage.VisitFunc) error
func TraverseBfs(listDirFn ListDirectoryFunc, parentPath util.FullPath, visitFn remote_storage.VisitFunc) (err error) {
K := 5
var dirQueueWg sync.WaitGroup
dirQueue := util.NewQueue()
dirQueueWg.Add(1)
dirQueue.Enqueue(parentPath)
var isTerminating bool
for i := 0; i < K; i++ {
go func() {
for {
if isTerminating {
break
}
t := dirQueue.Dequeue()
if t == nil {
time.Sleep(329 * time.Millisecond)
continue
}
dir := t.(util.FullPath)
processErr := processOneDirectory(listDirFn, dir, visitFn, dirQueue, &dirQueueWg)
if processErr != nil {
err = processErr
}
dirQueueWg.Done()
}
}()
}
dirQueueWg.Wait()
isTerminating = true
return
}
func processOneDirectory(listDirFn ListDirectoryFunc, parentPath util.FullPath, visitFn remote_storage.VisitFunc, dirQueue *util.Queue, dirQueueWg *sync.WaitGroup) (error) {
return listDirFn(parentPath, func(dir string, name string, isDirectory bool, remoteEntry *filer_pb.RemoteEntry) error {
if err := visitFn(dir, name, isDirectory, remoteEntry); err != nil {
return err
}
if !isDirectory {
return nil
}
dirQueueWg.Add(1)
dirQueue.Enqueue(parentPath.Child(name))
return nil
})
}