feat(filer): add lazy directory listing for remote mounts (#8615)

* feat(filer): add lazy directory listing for remote mounts

Directory listings on remote mounts previously only queried the local
filer store. With lazy mounts the listing was empty; with eager mounts
it went stale over time.

Add on-demand directory listing that fetches from remote and caches
results with a 5-minute TTL:

- Add `ListDirectory` to `RemoteStorageClient` interface (delimiter-based,
  single-level listing, separate from recursive `Traverse`)
- Implement in S3, GCS, and Azure backends using each platform's
  hierarchical listing API
- Add `maybeLazyListFromRemote` to filer: before each directory listing,
  check if the directory is under a remote mount with an expired cache,
  fetch from remote, persist entries to the local store, then let existing
  listing logic run on the populated store
- Use singleflight to deduplicate concurrent requests for the same directory
- Skip local-only entries (no RemoteEntry) to avoid overwriting unsynced uploads
- Errors are logged and swallowed (availability over consistency)

* refactor: extract xattr key to constant xattrRemoteListingSyncedAt

* feat: make listing cache TTL configurable per mount via listing_cache_ttl_seconds

Add listing_cache_ttl_seconds field to RemoteStorageLocation protobuf.
When 0 (default), lazy directory listing is disabled for that mount.
When >0, enables on-demand directory listing with the specified TTL.

Expose as -listingCacheTTL flag on remote.mount command.

* refactor: address review feedback for lazy directory listing

- Add context.Context to ListDirectory interface and all implementations
- Capture startTime before remote call for accurate TTL tracking
- Simplify S3 ListDirectory using ListObjectsV2PagesWithContext
- Make maybeLazyListFromRemote return void (errors always swallowed)
- Remove redundant trailing-slash path manipulation in caller
- Update tests to match new signatures

* When an existing entry has Remote != nil, we should merge remote metadata   into it rather than replacing it.

* fix(gcs): wrap ListDirectory iterator error with context

The raw iterator error was returned without bucket/path context,
making it harder to debug. Wrap it consistently with the S3 pattern.

* fix(s3): guard against nil pointer dereference in Traverse and ListDirectory

Some S3-compatible backends may return nil for LastModified, Size, or
ETag fields. Check for nil before dereferencing to prevent panics.

* fix(filer): remove blanket 2-minute timeout from lazy listing context

Individual SDK operations (S3, GCS, Azure) already have per-request
timeouts and retry policies. The blanket timeout could cut off large
directory listings mid-operation even though individual pages were
succeeding.

* fix(filer): preserve trace context in lazy listing with WithoutCancel

Use context.WithoutCancel(ctx) instead of context.Background() so
trace/span values from the incoming request are retained for
distributed tracing, while still decoupling cancellation.

* fix(filer): use Store.FindEntry for internal lookups, add Uid/Gid to files, fix updateDirectoryListingSyncedAt

- Use f.Store.FindEntry instead of f.FindEntry for staleness check and
  child lookups to avoid unnecessary lazy-fetch overhead
- Set OS_UID/OS_GID on new file entries for consistency with directories
- In updateDirectoryListingSyncedAt, use Store.UpdateEntry for existing
  directories instead of CreateEntry to avoid deleteChunksIfNotNew and
  NotifyUpdateEvent side effects

* fix(filer): distinguish not-found from store errors in lazy listing

Previously, any error from Store.FindEntry was treated as "not found,"
which could cause entry recreation/overwrite on transient DB failures.
Now check for filer_pb.ErrNotFound explicitly and skip entries or
bail out on real store errors.

* refactor(filer): use errors.Is for ErrNotFound comparisons
This commit is contained in:
Chris Lu
2026-03-13 09:36:54 -07:00
committed by GitHub
parent 3fe5a7d761
commit f3c5ba3cd6
10 changed files with 687 additions and 13 deletions

View File

@@ -127,6 +127,68 @@ type azureRemoteStorageClient struct {
var _ = remote_storage.RemoteStorageClient(&azureRemoteStorageClient{})
func (az *azureRemoteStorageClient) ListDirectory(ctx context.Context, loc *remote_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) {
pathKey := loc.Path[1:]
if pathKey != "" && !strings.HasSuffix(pathKey, "/") {
pathKey += "/"
}
containerClient := az.client.ServiceClient().NewContainerClient(loc.Bucket)
pager := containerClient.NewListBlobsHierarchyPager("/", &container.ListBlobsHierarchyOptions{
Prefix: &pathKey,
})
for pager.More() {
resp, pageErr := pager.NextPage(ctx)
if pageErr != nil {
return fmt.Errorf("azure list directory %s%s: %w", loc.Bucket, loc.Path, pageErr)
}
for _, prefix := range resp.Segment.BlobPrefixes {
if prefix.Name == nil {
continue
}
dirKey := "/" + strings.TrimSuffix(*prefix.Name, "/")
dir, name := util.FullPath(dirKey).DirAndName()
if err = visitFn(dir, name, true, nil); err != nil {
return fmt.Errorf("azure processing directory prefix %s: %w", *prefix.Name, err)
}
}
for _, blobItem := range resp.Segment.BlobItems {
if blobItem.Name == nil {
continue
}
key := "/" + *blobItem.Name
if strings.HasSuffix(key, "/") {
continue // skip directory markers
}
dir, name := util.FullPath(key).DirAndName()
remoteEntry := &filer_pb.RemoteEntry{
StorageName: az.conf.Name,
}
if blobItem.Properties != nil {
if blobItem.Properties.LastModified != nil {
remoteEntry.RemoteMtime = blobItem.Properties.LastModified.Unix()
}
if blobItem.Properties.ContentLength != nil {
remoteEntry.RemoteSize = *blobItem.Properties.ContentLength
}
if blobItem.Properties.ETag != nil {
remoteEntry.RemoteETag = string(*blobItem.Properties.ETag)
}
}
if err = visitFn(dir, name, false, remoteEntry); err != nil {
return fmt.Errorf("azure processing blob %s: %w", *blobItem.Name, err)
}
}
}
return nil
}
func (az *azureRemoteStorageClient) StatFile(loc *remote_pb.RemoteStorageLocation) (remoteEntry *filer_pb.RemoteEntry, err error) {
key := loc.Path[1:]
ctx, cancel := context.WithTimeout(context.Background(), DefaultAzureOpTimeout)

View File

@@ -131,6 +131,52 @@ func (gcs *gcsRemoteStorageClient) Traverse(loc *remote_pb.RemoteStorageLocation
const defaultGCSOpTimeout = 30 * time.Second
func (gcs *gcsRemoteStorageClient) ListDirectory(ctx context.Context, loc *remote_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) {
pathKey := loc.Path[1:]
if pathKey != "" && !strings.HasSuffix(pathKey, "/") {
pathKey += "/"
}
objectIterator := gcs.client.Bucket(loc.Bucket).Objects(ctx, &storage.Query{
Delimiter: "/",
Prefix: pathKey,
Versions: false,
})
for {
objectAttr, iterErr := objectIterator.Next()
if iterErr != nil {
if iterErr == iterator.Done {
return nil
}
return fmt.Errorf("list directory %s%s: %w", loc.Bucket, loc.Path, iterErr)
}
if objectAttr.Prefix != "" {
// Common prefix → subdirectory
dirKey := "/" + strings.TrimSuffix(objectAttr.Prefix, "/")
dir, name := util.FullPath(dirKey).DirAndName()
if err = visitFn(dir, name, true, nil); err != nil {
return err
}
} else {
key := "/" + objectAttr.Name
if strings.HasSuffix(key, "/") {
continue // skip directory markers
}
dir, name := util.FullPath(key).DirAndName()
if err = visitFn(dir, name, false, &filer_pb.RemoteEntry{
RemoteMtime: objectAttr.Updated.Unix(),
RemoteSize: objectAttr.Size,
RemoteETag: objectAttr.Etag,
StorageName: gcs.conf.Name,
}); err != nil {
return err
}
}
}
}
func (gcs *gcsRemoteStorageClient) StatFile(loc *remote_pb.RemoteStorageLocation) (remoteEntry *filer_pb.RemoteEntry, err error) {
key := loc.Path[1:]
ctx, cancel := context.WithTimeout(context.Background(), defaultGCSOpTimeout)

View File

@@ -1,6 +1,7 @@
package remote_storage
import (
"context"
"errors"
"fmt"
"io"
@@ -75,6 +76,7 @@ var ErrRemoteObjectNotFound = errors.New("remote object not found")
type RemoteStorageClient interface {
Traverse(loc *remote_pb.RemoteStorageLocation, visitFn VisitFunc) error
ListDirectory(ctx context.Context, loc *remote_pb.RemoteStorageLocation, visitFn VisitFunc) error
StatFile(loc *remote_pb.RemoteStorageLocation) (remoteEntry *filer_pb.RemoteEntry, err error)
ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error)
WriteDirectory(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry) (err error)

View File

@@ -1,10 +1,12 @@
package s3
import (
"context"
"fmt"
"io"
"net/http"
"reflect"
"strings"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
@@ -98,12 +100,19 @@ func (s *s3RemoteStorageClient) Traverse(remote *remote_pb.RemoteStorageLocation
key := *content.Key
key = "/" + key
dir, name := util.FullPath(key).DirAndName()
if err := visitFn(dir, name, false, &filer_pb.RemoteEntry{
RemoteMtime: (*content.LastModified).Unix(),
RemoteSize: *content.Size,
RemoteETag: *content.ETag,
remoteEntry := &filer_pb.RemoteEntry{
StorageName: s.conf.Name,
}); err != nil {
}
if content.LastModified != nil {
remoteEntry.RemoteMtime = content.LastModified.Unix()
}
if content.Size != nil {
remoteEntry.RemoteSize = *content.Size
}
if content.ETag != nil {
remoteEntry.RemoteETag = *content.ETag
}
if err := visitFn(dir, name, false, remoteEntry); err != nil {
localErr = err
return false
}
@@ -122,6 +131,65 @@ func (s *s3RemoteStorageClient) Traverse(remote *remote_pb.RemoteStorageLocation
return
}
func (s *s3RemoteStorageClient) ListDirectory(ctx context.Context, loc *remote_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) error {
pathKey := loc.Path[1:]
if pathKey != "" && !strings.HasSuffix(pathKey, "/") {
pathKey += "/"
}
listInput := &s3.ListObjectsV2Input{
Bucket: aws.String(loc.Bucket),
Prefix: aws.String(pathKey),
Delimiter: aws.String("/"),
}
var localErr error
listErr := s.conn.ListObjectsV2PagesWithContext(ctx, listInput, func(page *s3.ListObjectsV2Output, lastPage bool) bool {
for _, prefix := range page.CommonPrefixes {
if prefix.Prefix == nil {
continue
}
dirKey := "/" + strings.TrimSuffix(*prefix.Prefix, "/")
dir, name := util.FullPath(dirKey).DirAndName()
if err := visitFn(dir, name, true, nil); err != nil {
localErr = err
return false
}
}
for _, content := range page.Contents {
key := "/" + *content.Key
if strings.HasSuffix(key, "/") {
continue // skip directory markers
}
dir, name := util.FullPath(key).DirAndName()
remoteEntry := &filer_pb.RemoteEntry{
StorageName: s.conf.Name,
}
if content.LastModified != nil {
remoteEntry.RemoteMtime = content.LastModified.Unix()
}
if content.Size != nil {
remoteEntry.RemoteSize = *content.Size
}
if content.ETag != nil {
remoteEntry.RemoteETag = *content.ETag
}
if err := visitFn(dir, name, false, remoteEntry); err != nil {
localErr = err
return false
}
}
return true
})
if listErr != nil {
return fmt.Errorf("list directory %v: %w", loc, listErr)
}
if localErr != nil {
return fmt.Errorf("process directory %v: %w", loc, localErr)
}
return nil
}
func (s *s3RemoteStorageClient) StatFile(loc *remote_pb.RemoteStorageLocation) (remoteEntry *filer_pb.RemoteEntry, err error) {
resp, err := s.conn.HeadObject(&s3.HeadObjectInput{
Bucket: aws.String(loc.Bucket),