diff --git a/weed/command/filer_sync.go b/weed/command/filer_sync.go index 2f4aa43f2..7b9c85c99 100644 --- a/weed/command/filer_sync.go +++ b/weed/command/filer_sync.go @@ -359,6 +359,7 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, sourceGrpcDi } var lastLogTsNs = time.Now().UnixNano() + var lastProgressedTsNs int64 var clientName = fmt.Sprintf("syncFrom_%s_To_%s", string(sourceFiler), string(targetFiler)) processEventFnWithOffset := pb.AddOffsetFunc(func(resp *filer_pb.SubscribeMetadataResponse) error { processor.AddSyncJob(resp) @@ -372,6 +373,13 @@ func doSubscribeFilerMetaChanges(clientId int32, clientEpoch int32, sourceGrpcDi now := time.Now().UnixNano() glog.V(0).Infof("sync %s to %s progressed to %v %0.2f/sec", sourceFiler, targetFiler, time.Unix(0, offsetTsNs), float64(counter)/(float64(now-lastLogTsNs)/1e9)) lastLogTsNs = now + if offsetTsNs == lastProgressedTsNs { + for _, t := range filerSink.ActiveTransfers() { + glog.V(0).Infof(" %s %s: %d bytes received, %s", + t.ChunkFileId, t.Path, t.BytesReceived, t.Status) + } + } + lastProgressedTsNs = offsetTsNs // collect synchronous offset statsCollect.FilerSyncOffsetGauge.WithLabelValues(sourceFiler.String(), targetFiler.String(), clientName, sourcePath).Set(float64(offsetTsNs)) return setOffset(targetGrpcDialOption, targetFiler, getSignaturePrefixByPath(sourcePath), sourceFilerSignature, offsetTsNs) diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index 05db3f4dd..35ce29501 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -241,6 +241,14 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, return "", fmt.Errorf("upload data: %w", err) } + transferStatus := &ChunkTransferStatus{ + ChunkFileId: sourceChunk.GetFileIdString(), + Path: path, + Status: "downloading", + } + fs.activeTransfers.Store(sourceChunk.GetFileIdString(), transferStatus) + defer fs.activeTransfers.Delete(sourceChunk.GetFileIdString()) + eofBackoff := time.Duration(0) var partialData []byte var savedFilename string @@ -282,6 +290,9 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, fullData = data } + transferStatus.BytesReceived = int64(len(fullData)) + transferStatus.Status = "uploading" + currentFileId, uploadResult, uploadErr, _ := uploader.UploadWithRetry( fs, &filer_pb.AssignVolumeRequest{ @@ -326,9 +337,12 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, } if isEofError(retryErr) { eofBackoff = nextEofBackoff(eofBackoff) + transferStatus.BytesReceived = int64(len(partialData)) + transferStatus.Status = fmt.Sprintf("waiting %v", eofBackoff) glog.V(0).Infof("source connection interrupted while replicating %s for %s (%d bytes received so far), backing off %v: %v", sourceChunk.GetFileIdString(), path, len(partialData), eofBackoff, retryErr) time.Sleep(eofBackoff) + transferStatus.Status = "downloading" } else { glog.V(0).Infof("replicate %s for %s: %v", sourceChunk.GetFileIdString(), path, retryErr) } diff --git a/weed/replication/sink/filersink/filer_sink.go b/weed/replication/sink/filersink/filer_sink.go index 1bda6d1a0..b42c89fab 100644 --- a/weed/replication/sink/filersink/filer_sink.go +++ b/weed/replication/sink/filersink/filer_sink.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math" + "sync" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/wdclient" @@ -20,6 +21,14 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" ) +// ChunkTransferStatus tracks the progress of a single chunk being replicated. +type ChunkTransferStatus struct { + ChunkFileId string + Path string + BytesReceived int64 + Status string // "downloading", "uploading", or "waiting 10s" etc. +} + type FilerSink struct { filerSource *source.FilerSource grpcAddress string @@ -35,6 +44,7 @@ type FilerSink struct { isIncremental bool executor *util.LimitedConcurrentExecutor signature int32 + activeTransfers sync.Map // chunkFileId -> *ChunkTransferStatus } func init() { @@ -101,6 +111,16 @@ func (fs *FilerSink) SetChunkConcurrency(concurrency int) { } } +// ActiveTransfers returns a snapshot of all in-progress chunk transfers. +func (fs *FilerSink) ActiveTransfers() []*ChunkTransferStatus { + var transfers []*ChunkTransferStatus + fs.activeTransfers.Range(func(key, value any) bool { + transfers = append(transfers, value.(*ChunkTransferStatus)) + return true + }) + return transfers +} + func (fs *FilerSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks bool, signatures []int32) error { dir, name := util.FullPath(key).DirAndName()