filer.sync: show active chunk transfers when sync progress stalls (#8889)

* filer.sync: show active chunk transfers when sync progress stalls

When the sync watermark is not advancing, print each in-progress chunk
transfer with its file path, bytes received so far, and current status
(downloading, uploading, or waiting with backoff duration). This helps
diagnose which files are blocking progress during replication.

Closes #8542

* filer.sync: include last error in stall diagnostics

* filer.sync: fix data races in ChunkTransferStatus

Add sync.RWMutex to ChunkTransferStatus and lock around all field
mutations in fetchAndWrite. ActiveTransfers now returns value copies
under RLock so callers get immutable snapshots.
This commit is contained in:
Chris Lu
2026-04-02 13:08:24 -07:00
committed by GitHub
parent a974190cb1
commit 9552e80b58
3 changed files with 70 additions and 0 deletions

View File

@@ -359,6 +359,7 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, sourceGrpcDi
} }
var lastLogTsNs = time.Now().UnixNano() var lastLogTsNs = time.Now().UnixNano()
var lastProgressedTsNs int64
var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler)) var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler))
processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error { processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error {
processor.AddSyncJob(resp) processor.AddSyncJob(resp)
@@ -372,6 +373,18 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, sourceGrpcDi
now := time.Now().UnixNano() now := time.Now().UnixNano()
glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, offsetTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9)) glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, offsetTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9))
lastLogTsNs = now lastLogTsNs = now
if offsetTsNs == lastProgressedTsNs {
for _, t := range filerSink.ActiveTransfers() {
if t.LastErr != "" {
glog.V(0).Infof(" %s %s: %d bytes received, %s, last error: %s",
t.ChunkFileId, t.Path, t.BytesReceived, t.Status, t.LastErr)
} else {
glog.V(0).Infof(" %s %s: %d bytes received, %s",
t.ChunkFileId, t.Path, t.BytesReceived, t.Status)
}
}
}
lastProgressedTsNs = offsetTsNs
// collect synchronous offset // collect synchronous offset
statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(offsetTsNs)) statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(offsetTsNs))
return setOffset(targetGrpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, offsetTsNs) return setOffset(targetGrpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, offsetTsNs)

View File

@@ -241,6 +241,14 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string,
return "", fmt.Errorf("upload data: %w", err) return "", fmt.Errorf("upload data: %w", err)
} }
transferStatus := &ChunkTransferStatus{
ChunkFileId: sourceChunk.GetFileIdString(),
Path: path,
Status: "downloading",
}
fs.activeTransfers.Store(sourceChunk.GetFileIdString(), transferStatus)
defer fs.activeTransfers.Delete(sourceChunk.GetFileIdString())
eofBackoff := time.Duration(0) eofBackoff := time.Duration(0)
var partialData []byte var partialData []byte
var savedFilename string var savedFilename string
@@ -282,6 +290,11 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string,
fullData = data fullData = data
} }
transferStatus.mu.Lock()
transferStatus.BytesReceived = int64(len(fullData))
transferStatus.Status = "uploading"
transferStatus.mu.Unlock()
currentFileId, uploadResult, uploadErr, _ := uploader.UploadWithRetry( currentFileId, uploadResult, uploadErr, _ := uploader.UploadWithRetry(
fs, fs,
&filer_pb.AssignVolumeRequest{ &filer_pb.AssignVolumeRequest{
@@ -324,11 +337,21 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string,
glog.V(1).Infof("skip retrying stale source %s for %s: %v", sourceChunk.GetFileIdString(), path, retryErr) glog.V(1).Infof("skip retrying stale source %s for %s: %v", sourceChunk.GetFileIdString(), path, retryErr)
return false return false
} }
transferStatus.mu.Lock()
transferStatus.LastErr = retryErr.Error()
transferStatus.mu.Unlock()
if isEofError(retryErr) { if isEofError(retryErr) {
eofBackoff = nextEofBackoff(eofBackoff) eofBackoff = nextEofBackoff(eofBackoff)
transferStatus.mu.Lock()
transferStatus.BytesReceived = int64(len(partialData))
transferStatus.Status = fmt.Sprintf("waiting %v", eofBackoff)
transferStatus.mu.Unlock()
glog.V(0).Infof("source connection interrupted while replicating %s for %s (%d bytes received so far), backing off %v: %v", glog.V(0).Infof("source connection interrupted while replicating %s for %s (%d bytes received so far), backing off %v: %v",
sourceChunk.GetFileIdString(), path, len(partialData), eofBackoff, retryErr) sourceChunk.GetFileIdString(), path, len(partialData), eofBackoff, retryErr)
time.Sleep(eofBackoff) time.Sleep(eofBackoff)
transferStatus.mu.Lock()
transferStatus.Status = "downloading"
transferStatus.mu.Unlock()
} else { } else {
glog.V(0).Infof("replicate %s for %s: %v", sourceChunk.GetFileIdString(), path, retryErr) glog.V(0).Infof("replicate %s for %s: %v", sourceChunk.GetFileIdString(), path, retryErr)
} }

View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"math" "math"
"sync"
"github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/wdclient" "github.com/seaweedfs/seaweedfs/weed/wdclient"
@@ -20,6 +21,19 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util"
) )
// ChunkTransferStatus tracks the progress of a single chunk being replicated.
// Fields are guarded by mu: ChunkFileId and Path are immutable after creation,
// while BytesReceived, Status, and LastErr are updated by fetchAndWrite and
// read by ActiveTransfers.
type ChunkTransferStatus struct {
mu sync.RWMutex
ChunkFileId string
Path string
BytesReceived int64
Status string // "downloading", "uploading", or "waiting 10s" etc.
LastErr string
}
type FilerSink struct { type FilerSink struct {
filerSource *source.FilerSource filerSource *source.FilerSource
grpcAddress string grpcAddress string
@@ -35,6 +49,7 @@ type FilerSink struct {
isIncremental bool isIncremental bool
executor *util.LimitedConcurrentExecutor executor *util.LimitedConcurrentExecutor
signature int32 signature int32
activeTransfers sync.Map // chunkFileId -> *ChunkTransferStatus
} }
func init() { func init() {
@@ -101,6 +116,25 @@ func (fs *FilerSink) SetChunkConcurrency(concurrency int) {
} }
} }
// ActiveTransfers returns an immutable snapshot of all in-progress chunk transfers.
func (fs *FilerSink) ActiveTransfers() []ChunkTransferStatus {
var transfers []ChunkTransferStatus
fs.activeTransfers.Range(func(key, value any) bool {
t := value.(*ChunkTransferStatus)
t.mu.RLock()
transfers = append(transfers, ChunkTransferStatus{
ChunkFileId: t.ChunkFileId,
Path: t.Path,
BytesReceived: t.BytesReceived,
Status: t.Status,
LastErr: t.LastErr,
})
t.mu.RUnlock()
return true
})
return transfers
}
func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error { func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error {
dir, name := util.FullPath(key).DirAndName() dir, name := util.FullPath(key).DirAndName()