From 9552e80b582532dfe084c32971e73607a47af055 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 2 Apr 2026 13:08:24 -0700 Subject: [PATCH] filer.sync: show active chunk transfers when sync progress stalls (#8889) * filer.sync: show active chunk transfers when sync progress stalls When the sync watermark is not advancing, print each in-progress chunk transfer with its file path, bytes received so far, and current status (downloading, uploading, or waiting with backoff duration). This helps diagnose which files are blocking progress during replication. Closes #8542 * filer.sync: include last error in stall diagnostics * filer.sync: fix data races in ChunkTransferStatus Add sync.RWMutex to ChunkTransferStatus and lock around all field mutations in fetchAndWrite. ActiveTransfers now returns value copies under RLock so callers get immutable snapshots. --- weed/command/filer_sync.go | 13 +++++++ .../replication/sink/filersink/fetch_write.go | 23 +++++++++++++ weed/replication/sink/filersink/filer_sink.go | 34 +++++++++++++++++++ 3 files changed, 70 insertions(+) diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 2f4aa43f2..a509deb48 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -359,6 +359,7 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, sourceGrpcDi } var lastLogTsNs = time.Now().UnixNano() + var lastProgressedTsNs int64 var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler)) processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error { processor.AddSyncJob(resp) @@ -372,6 +373,18 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, sourceGrpcDi now := time.Now().UnixNano() glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, offsetTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9)) lastLogTsNs = now + if offsetTsNs == lastProgressedTsNs { + for _, t := range filerSink.ActiveTransfers() { + if t.LastErr != "" { + glog.V(0).Infof(" %s %s: %d bytes received, %s, last error: %s", + t.ChunkFileId, t.Path, t.BytesReceived, t.Status, t.LastErr) + } else { + glog.V(0).Infof(" %s %s: %d bytes received, %s", + t.ChunkFileId, t.Path, t.BytesReceived, t.Status) + } + } + } + lastProgressedTsNs = offsetTsNs // collect synchronous offset statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(offsetTsNs)) return setOffset(targetGrpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, offsetTsNs) diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index 05db3f4dd..a2a12f95a 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -241,6 +241,14 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, return "", fmt.Errorf("upload data: %w", err) } + transferStatus := &ChunkTransferStatus{ + ChunkFileId: sourceChunk.GetFileIdString(), + Path: path, + Status: "downloading", + } + fs.activeTransfers.Store(sourceChunk.GetFileIdString(), transferStatus) + defer fs.activeTransfers.Delete(sourceChunk.GetFileIdString()) + eofBackoff := time.Duration(0) var partialData []byte var savedFilename string @@ -282,6 +290,11 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, fullData = data } + transferStatus.mu.Lock() + transferStatus.BytesReceived = int64(len(fullData)) + transferStatus.Status = "uploading" + transferStatus.mu.Unlock() + currentFileId, uploadResult, uploadErr, _ := uploader.UploadWithRetry( fs, &filer_pb.AssignVolumeRequest{ @@ -324,11 +337,21 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, glog.V(1).Infof("skip retrying stale source %s for %s: %v", sourceChunk.GetFileIdString(), path, retryErr) return false } + transferStatus.mu.Lock() + transferStatus.LastErr = retryErr.Error() + transferStatus.mu.Unlock() if isEofError(retryErr) { eofBackoff = nextEofBackoff(eofBackoff) + transferStatus.mu.Lock() + transferStatus.BytesReceived = int64(len(partialData)) + transferStatus.Status = fmt.Sprintf("waiting %v", eofBackoff) + transferStatus.mu.Unlock() glog.V(0).Infof("source connection interrupted while replicating %s for %s (%d bytes received so far), backing off %v: %v", sourceChunk.GetFileIdString(), path, len(partialData), eofBackoff, retryErr) time.Sleep(eofBackoff) + transferStatus.mu.Lock() + transferStatus.Status = "downloading" + transferStatus.mu.Unlock() } else { glog.V(0).Infof("replicate %s for %s: %v", sourceChunk.GetFileIdString(), path, retryErr) } diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 1bda6d1a0..a51d1c050 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math" + "sync" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/wdclient" @@ -20,6 +21,19 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" ) +// ChunkTransferStatus tracks the progress of a single chunk being replicated. +// Fields are guarded by mu: ChunkFileId and Path are immutable after creation, +// while BytesReceived, Status, and LastErr are updated by fetchAndWrite and +// read by ActiveTransfers. +type ChunkTransferStatus struct { + mu sync.RWMutex + ChunkFileId string + Path string + BytesReceived int64 + Status string // "downloading", "uploading", or "waiting 10s" etc. + LastErr string +} + type FilerSink struct { filerSource *source.FilerSource grpcAddress string @@ -35,6 +49,7 @@ type FilerSink struct { isIncremental bool executor *util.LimitedConcurrentExecutor signature int32 + activeTransfers sync.Map // chunkFileId -> *ChunkTransferStatus } func init() { @@ -101,6 +116,25 @@ func (fs *FilerSink) SetChunkConcurrency(concurrency int) { } } +// ActiveTransfers returns an immutable snapshot of all in-progress chunk transfers. +func (fs *FilerSink) ActiveTransfers() []ChunkTransferStatus { + var transfers []ChunkTransferStatus + fs.activeTransfers.Range(func(key, value any) bool { + t := value.(*ChunkTransferStatus) + t.mu.RLock() + transfers = append(transfers, ChunkTransferStatus{ + ChunkFileId: t.ChunkFileId, + Path: t.Path, + BytesReceived: t.BytesReceived, + Status: t.Status, + LastErr: t.LastErr, + }) + t.mu.RUnlock() + return true + }) + return transfers +} + func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error { dir, name := util.FullPath(key).DirAndName()