From 2d4ea8c665112528ede886795e26a11218d532c8 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 2 Apr 2026 12:14:25 -0700 Subject: [PATCH 1/4] filer.sync: show active chunk transfers when sync progress stalls When the sync watermark is not advancing, print each in-progress chunk transfer with its file path, bytes received so far, and current status (downloading, uploading, or waiting with backoff duration). This helps diagnose which files are blocking progress during replication. Closes #8542 --- weed/command/filer_sync.go | 8 ++++++++ .../replication/sink/filersink/fetch_write.go | 14 +++++++++++++ weed/replication/sink/filersink/filer_sink.go | 20 +++++++++++++++++++ 3 files changed, 42 insertions(+) diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 2f4aa43f2..7b9c85c99 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -359,6 +359,7 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, sourceGrpcDi } var lastLogTsNs = time.Now().UnixNano() + var lastProgressedTsNs int64 var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler)) processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error { processor.AddSyncJob(resp) @@ -372,6 +373,13 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, sourceGrpcDi now := time.Now().UnixNano() glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, offsetTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9)) lastLogTsNs = now + if offsetTsNs == lastProgressedTsNs { + for _, t := range filerSink.ActiveTransfers() { + glog.V(0).Infof(" %s %s: %d bytes received, %s", + t.ChunkFileId, t.Path, t.BytesReceived, t.Status) + } + } + lastProgressedTsNs = offsetTsNs // collect synchronous offset statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(offsetTsNs)) return setOffset(targetGrpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, offsetTsNs) diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index 05db3f4dd..35ce29501 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -241,6 +241,14 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, return "", fmt.Errorf("upload data: %w", err) } + transferStatus := &ChunkTransferStatus{ + ChunkFileId: sourceChunk.GetFileIdString(), + Path: path, + Status: "downloading", + } + fs.activeTransfers.Store(sourceChunk.GetFileIdString(), transferStatus) + defer fs.activeTransfers.Delete(sourceChunk.GetFileIdString()) + eofBackoff := time.Duration(0) var partialData []byte var savedFilename string @@ -282,6 +290,9 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, fullData = data } + transferStatus.BytesReceived = int64(len(fullData)) + transferStatus.Status = "uploading" + currentFileId, uploadResult, uploadErr, _ := uploader.UploadWithRetry( fs, &filer_pb.AssignVolumeRequest{ @@ -326,9 +337,12 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, } if isEofError(retryErr) { eofBackoff = nextEofBackoff(eofBackoff) + transferStatus.BytesReceived = int64(len(partialData)) + transferStatus.Status = fmt.Sprintf("waiting %v", eofBackoff) glog.V(0).Infof("source connection interrupted while replicating %s for %s (%d bytes received so far), backing off %v: %v", sourceChunk.GetFileIdString(), path, len(partialData), eofBackoff, retryErr) time.Sleep(eofBackoff) + transferStatus.Status = "downloading" } else { glog.V(0).Infof("replicate %s for %s: %v", sourceChunk.GetFileIdString(), path, retryErr) } diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 1bda6d1a0..b42c89fab 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math" + "sync" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/wdclient" @@ -20,6 +21,14 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" ) +// ChunkTransferStatus tracks the progress of a single chunk being replicated. +type ChunkTransferStatus struct { + ChunkFileId string + Path string + BytesReceived int64 + Status string // "downloading", "uploading", or "waiting 10s" etc. +} + type FilerSink struct { filerSource *source.FilerSource grpcAddress string @@ -35,6 +44,7 @@ type FilerSink struct { isIncremental bool executor *util.LimitedConcurrentExecutor signature int32 + activeTransfers sync.Map // chunkFileId -> *ChunkTransferStatus } func init() { @@ -101,6 +111,16 @@ func (fs *FilerSink) SetChunkConcurrency(concurrency int) { } } +// ActiveTransfers returns a snapshot of all in-progress chunk transfers. +func (fs *FilerSink) ActiveTransfers() []*ChunkTransferStatus { + var transfers []*ChunkTransferStatus + fs.activeTransfers.Range(func(key, value any) bool { + transfers = append(transfers, value.(*ChunkTransferStatus)) + return true + }) + return transfers +} + func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error { dir, name := util.FullPath(key).DirAndName() From b5cdd7160043b8bac77731a5c756a0f5a0381a34 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 2 Apr 2026 12:18:56 -0700 Subject: [PATCH 2/4] filer.sync: include last error in stall diagnostics --- weed/command/filer_sync.go | 9 +++++++-- weed/replication/sink/filersink/fetch_write.go | 1 + weed/replication/sink/filersink/filer_sink.go | 1 + 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 7b9c85c99..a509deb48 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -375,8 +375,13 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, sourceGrpcDi lastLogTsNs = now if offsetTsNs == lastProgressedTsNs { for _, t := range filerSink.ActiveTransfers() { - glog.V(0).Infof(" %s %s: %d bytes received, %s", - t.ChunkFileId, t.Path, t.BytesReceived, t.Status) + if t.LastErr != "" { + glog.V(0).Infof(" %s %s: %d bytes received, %s, last error: %s", + t.ChunkFileId, t.Path, t.BytesReceived, t.Status, t.LastErr) + } else { + glog.V(0).Infof(" %s %s: %d bytes received, %s", + t.ChunkFileId, t.Path, t.BytesReceived, t.Status) + } } } lastProgressedTsNs = offsetTsNs diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index 35ce29501..d711b76d3 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -335,6 +335,7 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, glog.V(1).Infof("skip retrying stale source %s for %s: %v", sourceChunk.GetFileIdString(), path, retryErr) return false } + transferStatus.LastErr = retryErr.Error() if isEofError(retryErr) { eofBackoff = nextEofBackoff(eofBackoff) transferStatus.BytesReceived = int64(len(partialData)) diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index b42c89fab..6bdeb10ee 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -27,6 +27,7 @@ type ChunkTransferStatus struct { Path string BytesReceived int64 Status string // "downloading", "uploading", or "waiting 10s" etc. + LastErr string } type FilerSink struct { From 597d383ca484af0f517b70d4eba87ca84038048b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 2 Apr 2026 13:04:21 -0700 Subject: [PATCH 3/4] filer.sync: fix data races in ChunkTransferStatus Add sync.RWMutex to ChunkTransferStatus and lock around all field mutations in fetchAndWrite. ActiveTransfers now returns value copies under RLock so callers get immutable snapshots. --- .../replication/sink/filersink/fetch_write.go | 8 +++++++ weed/replication/sink/filersink/filer_sink.go | 21 +++++++++++++++---- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index d711b76d3..a2a12f95a 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -290,8 +290,10 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, fullData = data } + transferStatus.mu.Lock() transferStatus.BytesReceived = int64(len(fullData)) transferStatus.Status = "uploading" + transferStatus.mu.Unlock() currentFileId, uploadResult, uploadErr, _ := uploader.UploadWithRetry( fs, @@ -335,15 +337,21 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, glog.V(1).Infof("skip retrying stale source %s for %s: %v", sourceChunk.GetFileIdString(), path, retryErr) return false } + transferStatus.mu.Lock() transferStatus.LastErr = retryErr.Error() + transferStatus.mu.Unlock() if isEofError(retryErr) { eofBackoff = nextEofBackoff(eofBackoff) + transferStatus.mu.Lock() transferStatus.BytesReceived = int64(len(partialData)) transferStatus.Status = fmt.Sprintf("waiting %v", eofBackoff) + transferStatus.mu.Unlock() glog.V(0).Infof("source connection interrupted while replicating %s for %s (%d bytes received so far), backing off %v: %v", sourceChunk.GetFileIdString(), path, len(partialData), eofBackoff, retryErr) time.Sleep(eofBackoff) + transferStatus.mu.Lock() transferStatus.Status = "downloading" + transferStatus.mu.Unlock() } else { glog.V(0).Infof("replicate %s for %s: %v", sourceChunk.GetFileIdString(), path, retryErr) } diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 6bdeb10ee..a51d1c050 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -22,7 +22,11 @@ import ( ) // ChunkTransferStatus tracks the progress of a single chunk being replicated. +// Fields are guarded by mu: ChunkFileId and Path are immutable after creation, +// while BytesReceived, Status, and LastErr are updated by fetchAndWrite and +// read by ActiveTransfers. type ChunkTransferStatus struct { + mu sync.RWMutex ChunkFileId string Path string BytesReceived int64 @@ -112,11 +116,20 @@ func (fs *FilerSink) SetChunkConcurrency(concurrency int) { } } -// ActiveTransfers returns a snapshot of all in-progress chunk transfers. -func (fs *FilerSink) ActiveTransfers() []*ChunkTransferStatus { - var transfers []*ChunkTransferStatus +// ActiveTransfers returns an immutable snapshot of all in-progress chunk transfers. +func (fs *FilerSink) ActiveTransfers() []ChunkTransferStatus { + var transfers []ChunkTransferStatus fs.activeTransfers.Range(func(key, value any) bool { - transfers = append(transfers, value.(*ChunkTransferStatus)) + t := value.(*ChunkTransferStatus) + t.mu.RLock() + transfers = append(transfers, ChunkTransferStatus{ + ChunkFileId: t.ChunkFileId, + Path: t.Path, + BytesReceived: t.BytesReceived, + Status: t.Status, + LastErr: t.LastErr, + }) + t.mu.RUnlock() return true }) return transfers From 0f5e6b1f349b7f9893f3748ee67a6ea181afc7c9 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 2 Apr 2026 13:19:40 -0700 Subject: [PATCH 4/4] test: recover initial FUSE git clone on mount --- test/fuse_integration/git_operations_test.go | 83 +++++++++++++++++++- 1 file changed, 80 insertions(+), 3 deletions(-) diff --git a/test/fuse_integration/git_operations_test.go b/test/fuse_integration/git_operations_test.go index 50328d000..a32432f0a 100644 --- a/test/fuse_integration/git_operations_test.go +++ b/test/fuse_integration/git_operations_test.go @@ -85,9 +85,18 @@ func testGitCloneAndPull(t *testing.T, mountPoint, localDir string) { branch := gitOutput(t, localClone, "rev-parse", "--abbrev-ref", "HEAD") gitRun(t, localClone, "push", "origin", branch) + // The bare repo lives on the FUSE mount and can briefly disappear after + // the push completes. Give the mount a chance to settle, then recover + // from the local clone if the remote is still missing. + if !waitForBareRepoEventually(t, bareRepo, 10*time.Second) { + t.Logf("bare repo %s did not stabilise after push; forcing recovery before clone", bareRepo) + } + refreshDirEntry(t, bareRepo) + time.Sleep(1 * time.Second) + // ---- Phase 3: Clone from mount bare repo into on-mount working dir ---- t.Log("Phase 3: clone from mount bare repo to on-mount working dir") - gitRun(t, "", "clone", bareRepo, mountClone) + ensureMountCloneFromBareWithRecovery(t, bareRepo, localClone, mountClone) assertFileContains(t, filepath.Join(mountClone, "README.md"), "# Updated") assertFileContains(t, filepath.Join(mountClone, "src/main.go"), "v2") @@ -290,7 +299,7 @@ func waitForBareRepoEventually(t *testing.T, bareRepo string, timeout time.Durat t.Helper() deadline := time.Now().Add(timeout) for time.Now().Before(deadline) { - if isBareRepo(bareRepo) { + if isBareRepoAccessible(bareRepo) { return true } refreshDirEntry(t, bareRepo) @@ -312,6 +321,14 @@ func isBareRepo(bareRepo string) bool { return true } +func isBareRepoAccessible(bareRepo string) bool { + if !isBareRepo(bareRepo) { + return false + } + out, err := tryGitCommand("", "--git-dir="+bareRepo, "rev-parse", "--is-bare-repository") + return err == nil && out == "true" +} + func ensureMountClone(t *testing.T, bareRepo, mountClone string) { t.Helper() require.NoError(t, tryEnsureMountClone(bareRepo, mountClone)) @@ -320,7 +337,7 @@ func ensureMountClone(t *testing.T, bareRepo, mountClone string) { // tryEnsureBareRepo verifies the bare repo on the FUSE mount exists. // If it has vanished, it re-creates it from the local clone. func tryEnsureBareRepo(bareRepo, localClone string) error { - if _, err := os.Stat(filepath.Join(bareRepo, "HEAD")); err == nil { + if isBareRepoAccessible(bareRepo) { return nil } branch, err := tryGitCommand(localClone, "rev-parse", "--abbrev-ref", "HEAD") @@ -342,6 +359,36 @@ func tryEnsureBareRepo(bareRepo, localClone string) error { return nil } +func ensureMountCloneFromBareWithRecovery(t *testing.T, bareRepo, localClone, mountClone string) { + t.Helper() + const maxAttempts = 3 + var lastErr error + for attempt := 1; attempt <= maxAttempts; attempt++ { + if lastErr = tryEnsureMountCloneFromBare(bareRepo, localClone, mountClone); lastErr == nil { + return + } + if attempt == maxAttempts { + require.NoError(t, lastErr, "git clone %s %s failed after %d recovery attempts", bareRepo, mountClone, maxAttempts) + } + t.Logf("clone recovery attempt %d: %v — removing clone for re-create", attempt, lastErr) + os.RemoveAll(mountClone) + time.Sleep(2 * time.Second) + } +} + +func tryEnsureMountCloneFromBare(bareRepo, localClone, mountClone string) error { + if err := tryEnsureBareRepo(bareRepo, localClone); err != nil { + return fmt.Errorf("ensure bare repo: %w", err) + } + if err := tryEnsureMountClone(bareRepo, mountClone); err != nil { + return fmt.Errorf("ensure mount clone: %w", err) + } + if _, err := tryGitCommand(mountClone, "rev-parse", "HEAD"); err != nil { + return fmt.Errorf("verify mount clone: %w", err) + } + return nil +} + // tryEnsureMountClone is like ensureMountClone but returns an error instead // of failing the test, for use in recovery loops. func tryEnsureMountClone(bareRepo, mountClone string) error { @@ -550,3 +597,33 @@ func TestTryEnsureBareRepoPreservesCurrentBranch(t *testing.T) { assert.Equal(t, branch, restoredHead, "clone from recovered bare repo should check out the current branch") assertFileContains(t, filepath.Join(restoredClone, "README.md"), "hello recovery") } + +func TestEnsureMountCloneFromBareWithRecoveryRecreatesMissingBareRepo(t *testing.T) { + tempDir, err := os.MkdirTemp("", "git_mount_clone_recovery_") + require.NoError(t, err) + defer os.RemoveAll(tempDir) + + bareRepo := filepath.Join(tempDir, "repo.git") + localClone := filepath.Join(tempDir, "clone") + mountClone := filepath.Join(tempDir, "mount-clone") + + gitRun(t, "", "init", "--bare", bareRepo) + gitRun(t, "", "clone", bareRepo, localClone) + gitRun(t, localClone, "config", "user.email", "test@seaweedfs.test") + gitRun(t, localClone, "config", "user.name", "Test") + + writeFile(t, localClone, "README.md", "hello clone recovery\n") + gitRun(t, localClone, "add", "README.md") + gitRun(t, localClone, "commit", "-m", "initial commit") + + branch := gitOutput(t, localClone, "rev-parse", "--abbrev-ref", "HEAD") + gitRun(t, localClone, "push", "origin", branch) + + require.NoError(t, os.RemoveAll(bareRepo)) + + ensureMountCloneFromBareWithRecovery(t, bareRepo, localClone, mountClone) + + head := gitOutput(t, mountClone, "rev-parse", "--abbrev-ref", "HEAD") + assert.Equal(t, branch, head, "recovered clone should stay on the pushed branch") + assertFileContains(t, filepath.Join(mountClone, "README.md"), "hello clone recovery") +}