Files
seaweedFS/weed/replication/sink/filersink/fetch_write.go
Chris Lu 2d4ea8c665 filer.sync: show active chunk transfers when sync progress stalls
When the sync watermark is not advancing, print each in-progress chunk
transfer with its file path, bytes received so far, and current status
(downloading, uploading, or waiting with backoff duration). This helps
diagnose which files are blocking progress during replication.

Closes #8542
2026-04-02 12:14:25 -07:00

463 lines
13 KiB
Go

package filersink
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/schollz/progressbar/v3"
"google.golang.org/protobuf/proto"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
func (fs *FilerSink) replicateChunks(ctx context.Context, sourceChunks []*filer_pb.FileChunk, path string, sourceMtime int64) (replicatedChunks []*filer_pb.FileChunk, err error) {
if len(sourceChunks) == 0 {
return
}
if ctx == nil {
ctx = context.Background()
}
// a simple progress bar. Not ideal. Fix me.
var bar *progressbar.ProgressBar
if len(sourceChunks) > 1 {
name := filepath.Base(path)
bar = progressbar.NewOptions64(int64(len(sourceChunks)),
progressbar.OptionClearOnFinish(),
progressbar.OptionOnCompletion(func() {
fmt.Fprint(os.Stderr, "\n")
}),
progressbar.OptionFullWidth(),
progressbar.OptionSetDescription(name),
)
}
replicatedChunks = make([]*filer_pb.FileChunk, len(sourceChunks))
var errLock sync.Mutex
setError := func(e error) {
if e == nil {
return
}
errLock.Lock()
if err == nil {
err = e
}
errLock.Unlock()
}
hasError := func() bool {
errLock.Lock()
defer errLock.Unlock()
return err != nil
}
var wg sync.WaitGroup
for chunkIndex, sourceChunk := range sourceChunks {
if hasError() {
break
}
if sourceChunk.IsChunkManifest {
replicatedChunk, replicateErr := fs.replicateOneManifestChunk(ctx, sourceChunk, path, sourceMtime)
if replicateErr != nil {
setError(replicateErr)
break
}
replicatedChunks[chunkIndex] = replicatedChunk
if bar != nil {
bar.Add(1)
}
continue
}
wg.Add(1)
index, source := chunkIndex, sourceChunk
fs.executor.Execute(func() {
defer wg.Done()
var replicatedChunk *filer_pb.FileChunk
retryErr := util.Retry("replicate chunks", func() error {
chunk, e := fs.replicateOneChunk(source, path, sourceMtime)
if e != nil {
return e
}
replicatedChunk = chunk
return nil
})
if retryErr != nil {
setError(retryErr)
return
}
replicatedChunks[index] = replicatedChunk
if bar != nil {
bar.Add(1)
}
})
}
wg.Wait()
return
}
func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk, path string, sourceMtime int64) (*filer_pb.FileChunk, error) {
fileId, err := fs.fetchAndWrite(sourceChunk, path, sourceMtime)
if err != nil {
return nil, fmt.Errorf("copy %s: %v", sourceChunk.GetFileIdString(), err)
}
return &filer_pb.FileChunk{
FileId: fileId,
Offset: sourceChunk.Offset,
Size: sourceChunk.Size,
ModifiedTsNs: sourceChunk.ModifiedTsNs,
ETag: sourceChunk.ETag,
SourceFileId: sourceChunk.GetFileIdString(),
CipherKey: sourceChunk.CipherKey,
IsCompressed: sourceChunk.IsCompressed,
SseType: sourceChunk.SseType,
SseMetadata: sourceChunk.SseMetadata,
}, nil
}
func (fs *FilerSink) replicateOneManifestChunk(ctx context.Context, sourceChunk *filer_pb.FileChunk, path string, sourceMtime int64) (*filer_pb.FileChunk, error) {
resolvedChunks, err := filer.ResolveOneChunkManifest(ctx, fs.filerSource.LookupFileId, sourceChunk)
if err != nil {
return nil, fmt.Errorf("resolve manifest %s: %w", sourceChunk.GetFileIdString(), err)
}
replicatedResolvedChunks, err := fs.replicateChunks(ctx, resolvedChunks, path, sourceMtime)
if err != nil {
return nil, fmt.Errorf("replicate manifest data chunks %s: %w", sourceChunk.GetFileIdString(), err)
}
manifestDataChunks := make([]*filer_pb.FileChunk, len(replicatedResolvedChunks))
for i, chunk := range replicatedResolvedChunks {
copied := *chunk
manifestDataChunks[i] = &copied
}
filer_pb.BeforeEntrySerialization(manifestDataChunks)
manifestData, err := proto.Marshal(&filer_pb.FileChunkManifest{
Chunks: manifestDataChunks,
})
if err != nil {
return nil, fmt.Errorf("marshal manifest %s: %w", sourceChunk.GetFileIdString(), err)
}
manifestFileId, err := fs.uploadManifestChunk(path, sourceMtime, sourceChunk.GetFileIdString(), manifestData)
if err != nil {
return nil, err
}
return &filer_pb.FileChunk{
FileId: manifestFileId,
Offset: sourceChunk.Offset,
Size: sourceChunk.Size,
ModifiedTsNs: sourceChunk.ModifiedTsNs,
ETag: sourceChunk.ETag,
SourceFileId: sourceChunk.GetFileIdString(),
IsChunkManifest: true,
}, nil
}
func (fs *FilerSink) uploadManifestChunk(path string, sourceMtime int64, sourceFileId string, manifestData []byte) (fileId string, err error) {
uploader, err := operation.NewUploader()
if err != nil {
glog.V(0).Infof("upload manifest data %v: %v", sourceFileId, err)
return "", fmt.Errorf("upload manifest data: %w", err)
}
retryName := fmt.Sprintf("replicate manifest chunk %s", sourceFileId)
err = util.RetryUntil(retryName, func() error {
currentFileId, uploadResult, uploadErr, _ := uploader.UploadWithRetry(
fs,
&filer_pb.AssignVolumeRequest{
Count: 1,
Replication: fs.replication,
Collection: fs.collection,
TtlSec: fs.ttlSec,
DataCenter: fs.dataCenter,
DiskType: fs.diskType,
Path: path,
},
&operation.UploadOption{
Filename: "",
Cipher: false,
IsInputCompressed: false,
MimeType: "application/octet-stream",
PairMap: nil,
RetryForever: false,
},
func(host, fileId string) string {
return fs.buildUploadUrl(host, fileId)
},
bytes.NewReader(manifestData),
)
if uploadErr != nil {
return fmt.Errorf("upload manifest data: %w", uploadErr)
}
if uploadResult.Error != "" {
return fmt.Errorf("upload manifest result: %v", uploadResult.Error)
}
fileId = currentFileId
return nil
}, func(uploadErr error) (shouldContinue bool) {
if fs.hasSourceNewerVersion(path, sourceMtime) {
glog.V(1).Infof("skip retrying stale source manifest %s for %s: %v", sourceFileId, path, uploadErr)
return false
}
glog.V(0).Infof("replicate manifest %s for %s: %v", sourceFileId, path, uploadErr)
return true
})
if err != nil {
return "", err
}
return fileId, nil
}
func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string, sourceMtime int64) (fileId string, err error) {
uploader, err := operation.NewUploader()
if err != nil {
glog.V(0).Infof("upload source data %v: %v", sourceChunk.GetFileIdString(), err)
return "", fmt.Errorf("upload data: %w", err)
}
transferStatus := &ChunkTransferStatus{
ChunkFileId: sourceChunk.GetFileIdString(),
Path: path,
Status: "downloading",
}
fs.activeTransfers.Store(sourceChunk.GetFileIdString(), transferStatus)
defer fs.activeTransfers.Delete(sourceChunk.GetFileIdString())
eofBackoff := time.Duration(0)
var partialData []byte
var savedFilename string
var savedHeader http.Header
var savedSourceUrl string
retryName := fmt.Sprintf("replicate chunk %s", sourceChunk.GetFileIdString())
err = util.RetryUntil(retryName, func() error {
filename, header, resp, readErr := fs.filerSource.ReadPart(sourceChunk.GetFileIdString(), int64(len(partialData)))
if readErr != nil {
return fmt.Errorf("read part %s: %w", sourceChunk.GetFileIdString(), readErr)
}
defer util_http.CloseResponse(resp)
// Save metadata from first successful response
if len(partialData) == 0 {
savedFilename = filename
savedHeader = header
if resp.Request != nil && resp.Request.URL != nil {
savedSourceUrl = resp.Request.URL.String()
}
}
// Read the response body
data, readBodyErr := io.ReadAll(resp.Body)
if readBodyErr != nil {
// Keep whatever bytes we received before the error
partialData = append(partialData, data...)
return fmt.Errorf("read body: %w", readBodyErr)
}
// Combine with previously accumulated partial data
var fullData []byte
if len(partialData) > 0 {
fullData = append(partialData, data...)
glog.V(0).Infof("resumed reading %s, got %d + %d = %d bytes",
sourceChunk.GetFileIdString(), len(partialData), len(data), len(fullData))
partialData = nil
} else {
fullData = data
}
transferStatus.BytesReceived = int64(len(fullData))
transferStatus.Status = "uploading"
currentFileId, uploadResult, uploadErr, _ := uploader.UploadWithRetry(
fs,
&filer_pb.AssignVolumeRequest{
Count: 1,
Replication: fs.replication,
Collection: fs.collection,
TtlSec: fs.ttlSec,
DataCenter: fs.dataCenter,
DiskType: fs.diskType,
Path: path,
},
&operation.UploadOption{
Filename: savedFilename,
Cipher: false,
IsInputCompressed: "gzip" == savedHeader.Get("Content-Encoding"),
MimeType: savedHeader.Get("Content-Type"),
PairMap: nil,
RetryForever: false,
SourceUrl: savedSourceUrl,
},
func(host, fileId string) string {
fileUrl := fs.buildUploadUrl(host, fileId)
glog.V(4).Infof("replicating %s to %s header:%+v", savedFilename, fileUrl, savedHeader)
return fileUrl
},
util.NewBytesReader(fullData),
)
if uploadErr != nil {
return fmt.Errorf("upload data: %w", uploadErr)
}
if uploadResult.Error != "" {
return fmt.Errorf("upload result: %v", uploadResult.Error)
}
eofBackoff = 0
fileId = currentFileId
return nil
}, func(retryErr error) (shouldContinue bool) {
if fs.hasSourceNewerVersion(path, sourceMtime) {
glog.V(1).Infof("skip retrying stale source %s for %s: %v", sourceChunk.GetFileIdString(), path, retryErr)
return false
}
if isEofError(retryErr) {
eofBackoff = nextEofBackoff(eofBackoff)
transferStatus.BytesReceived = int64(len(partialData))
transferStatus.Status = fmt.Sprintf("waiting %v", eofBackoff)
glog.V(0).Infof("source connection interrupted while replicating %s for %s (%d bytes received so far), backing off %v: %v",
sourceChunk.GetFileIdString(), path, len(partialData), eofBackoff, retryErr)
time.Sleep(eofBackoff)
transferStatus.Status = "downloading"
} else {
glog.V(0).Infof("replicate %s for %s: %v", sourceChunk.GetFileIdString(), path, retryErr)
}
return true
})
if err != nil {
return "", err
}
return fileId, nil
}
const maxEofBackoff = 2 * time.Minute
// nextEofBackoff returns the next backoff duration for unexpected EOF errors.
// It starts at 10s, doubles each time, and caps at 2 minutes.
func nextEofBackoff(current time.Duration) time.Duration {
if current < 10*time.Second {
return 10 * time.Second
}
current *= 2
if current > maxEofBackoff {
current = maxEofBackoff
}
return current
}
func isEofError(err error) bool {
if err == nil {
return false
}
return errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, io.EOF)
}
func (fs *FilerSink) buildUploadUrl(host, fileId string) string {
if fs.writeChunkByFiler {
return fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, fileId)
}
return fmt.Sprintf("http://%s/%s", host, fileId)
}
func (fs *FilerSink) hasSourceNewerVersion(targetPath string, sourceMtime int64) bool {
if sourceMtime <= 0 || fs.filerSource == nil {
return false
}
sourcePath, ok := fs.targetPathToSourcePath(targetPath)
if !ok {
return false
}
sourceEntry, err := filer_pb.GetEntry(context.Background(), fs.filerSource, sourcePath)
if err != nil {
glog.V(1).Infof("lookup source entry %s: %v", sourcePath, err)
return false
}
if sourceEntry == nil {
glog.V(1).Infof("source entry %s no longer exists", sourcePath)
return true
}
return sourceEntry.Attributes != nil && sourceEntry.Attributes.Mtime > sourceMtime
}
func (fs *FilerSink) targetPathToSourcePath(targetPath string) (util.FullPath, bool) {
if fs.filerSource == nil {
return "", false
}
normalizePath := func(p string) string {
p = strings.TrimSuffix(p, "/")
if p == "" {
return "/"
}
return p
}
sourceRoot := normalizePath(fs.filerSource.Dir)
targetRoot := normalizePath(fs.dir)
targetPath = normalizePath(targetPath)
var relative string
switch {
case targetRoot == "/":
relative = strings.TrimPrefix(targetPath, "/")
case targetPath == targetRoot:
relative = ""
case strings.HasPrefix(targetPath, targetRoot+"/"):
relative = targetPath[len(targetRoot)+1:]
default:
return "", false
}
if relative == "" {
return util.FullPath(sourceRoot), true
}
return util.FullPath(sourceRoot).Child(relative), true
}
var _ = filer_pb.FilerClient(&FilerSink{})
func (fs *FilerSink) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
return pb.WithGrpcClient(streamingMode, fs.signature, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, fs.grpcAddress, false, fs.grpcDialOption)
}
func (fs *FilerSink) AdjustedUrl(location *filer_pb.Location) string {
return location.Url
}
func (fs *FilerSink) GetDataCenter() string {
return fs.dataCenter
}