From 597d383ca484af0f517b70d4eba87ca84038048b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 2 Apr 2026 13:04:21 -0700 Subject: [PATCH] filer.sync: fix data races in ChunkTransferStatus Add sync.RWMutex to ChunkTransferStatus and lock around all field mutations in fetchAndWrite. ActiveTransfers now returns value copies under RLock so callers get immutable snapshots. --- .../replication/sink/filersink/fetch_write.go | 8 +++++++ weed/replication/sink/filersink/filer_sink.go | 21 +++++++++++++++---- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index d711b76d3..a2a12f95a 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -290,8 +290,10 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, fullData = data } + transferStatus.mu.Lock() transferStatus.BytesReceived = int64(len(fullData)) transferStatus.Status = "uploading" + transferStatus.mu.Unlock() currentFileId, uploadResult, uploadErr, _ := uploader.UploadWithRetry( fs, @@ -335,15 +337,21 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, glog.V(1).Infof("skip retrying stale source %s for %s: %v", sourceChunk.GetFileIdString(), path, retryErr) return false } + transferStatus.mu.Lock() transferStatus.LastErr = retryErr.Error() + transferStatus.mu.Unlock() if isEofError(retryErr) { eofBackoff = nextEofBackoff(eofBackoff) + transferStatus.mu.Lock() transferStatus.BytesReceived = int64(len(partialData)) transferStatus.Status = fmt.Sprintf("waiting %v", eofBackoff) + transferStatus.mu.Unlock() glog.V(0).Infof("source connection interrupted while replicating %s for %s (%d bytes received so far), backing off %v: %v", sourceChunk.GetFileIdString(), path, len(partialData), eofBackoff, retryErr) time.Sleep(eofBackoff) + transferStatus.mu.Lock() transferStatus.Status = "downloading" + transferStatus.mu.Unlock() } else { glog.V(0).Infof("replicate %s for %s: %v", sourceChunk.GetFileIdString(), path, retryErr) } diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 6bdeb10ee..a51d1c050 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -22,7 +22,11 @@ import ( ) // ChunkTransferStatus tracks the progress of a single chunk being replicated. +// Fields are guarded by mu: ChunkFileId and Path are immutable after creation, +// while BytesReceived, Status, and LastErr are updated by fetchAndWrite and +// read by ActiveTransfers. type ChunkTransferStatus struct { + mu sync.RWMutex ChunkFileId string Path string BytesReceived int64 @@ -112,11 +116,20 @@ func (fs *FilerSink) SetChunkConcurrency(concurrency int) { } } -// ActiveTransfers returns a snapshot of all in-progress chunk transfers. -func (fs *FilerSink) ActiveTransfers() []*ChunkTransferStatus { - var transfers []*ChunkTransferStatus +// ActiveTransfers returns an immutable snapshot of all in-progress chunk transfers. +func (fs *FilerSink) ActiveTransfers() []ChunkTransferStatus { + var transfers []ChunkTransferStatus fs.activeTransfers.Range(func(key, value any) bool { - transfers = append(transfers, value.(*ChunkTransferStatus)) + t := value.(*ChunkTransferStatus) + t.mu.RLock() + transfers = append(transfers, ChunkTransferStatus{ + ChunkFileId: t.ChunkFileId, + Path: t.Path, + BytesReceived: t.BytesReceived, + Status: t.Status, + LastErr: t.LastErr, + }) + t.mu.RUnlock() return true }) return transfers