Files
seaweedFS/weed/replication/sink/filersink/fetch_write.go
Chris Lu cc2edfaf68 fix: enable RetryForever for active-active cluster sync to prevent out-of-sync (#7840)
Fixes #7230

When a cluster goes down during file replication, the chunk upload process
would fail after a limited number of retries. Once the remote cluster came
back online, those failed uploads were never retried, leaving the clusters
out-of-sync.

This change enables the RetryForever flag in the UploadOption when
replicating chunks between filers. This ensures that upload operations
will keep retrying indefinitely, and once the remote cluster comes back
online, the pending uploads will automatically succeed.

Users no longer need to manually run fs.meta.save and fs.meta.load as
a workaround for out-of-sync clusters.
2025-12-22 00:58:23 -08:00

161 lines
4.4 KiB
Go

package filersink
import (
"fmt"
"os"
"path/filepath"
"sync"
"github.com/schollz/progressbar/v3"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
func (fs *FilerSink) replicateChunks(sourceChunks []*filer_pb.FileChunk, path string) (replicatedChunks []*filer_pb.FileChunk, err error) {
if len(sourceChunks) == 0 {
return
}
// a simple progress bar. Not ideal. Fix me.
var bar *progressbar.ProgressBar
if len(sourceChunks) > 1 {
name := filepath.Base(path)
bar = progressbar.NewOptions64(int64(len(sourceChunks)),
progressbar.OptionClearOnFinish(),
progressbar.OptionOnCompletion(func() {
fmt.Fprint(os.Stderr, "\n")
}),
progressbar.OptionFullWidth(),
progressbar.OptionSetDescription(name),
)
}
replicatedChunks = make([]*filer_pb.FileChunk, len(sourceChunks))
var wg sync.WaitGroup
for chunkIndex, sourceChunk := range sourceChunks {
wg.Add(1)
index, source := chunkIndex, sourceChunk
fs.executor.Execute(func() {
defer wg.Done()
util.Retry("replicate chunks", func() error {
replicatedChunk, e := fs.replicateOneChunk(source, path)
if e != nil {
err = e
return e
}
replicatedChunks[index] = replicatedChunk
if bar != nil {
bar.Add(1)
}
err = nil
return nil
})
})
}
wg.Wait()
return
}
func (fs *FilerSink) replicateOneChunk(sourceChunk *filer_pb.FileChunk, path string) (*filer_pb.FileChunk, error) {
fileId, err := fs.fetchAndWrite(sourceChunk, path)
if err != nil {
return nil, fmt.Errorf("copy %s: %v", sourceChunk.GetFileIdString(), err)
}
return &filer_pb.FileChunk{
FileId: fileId,
Offset: sourceChunk.Offset,
Size: sourceChunk.Size,
ModifiedTsNs: sourceChunk.ModifiedTsNs,
ETag: sourceChunk.ETag,
SourceFileId: sourceChunk.GetFileIdString(),
CipherKey: sourceChunk.CipherKey,
IsCompressed: sourceChunk.IsCompressed,
}, nil
}
func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string) (fileId string, err error) {
filename, header, resp, err := fs.filerSource.ReadPart(sourceChunk.GetFileIdString())
if err != nil {
return "", fmt.Errorf("read part %s: %v", sourceChunk.GetFileIdString(), err)
}
defer util_http.CloseResponse(resp)
uploader, err := operation.NewUploader()
if err != nil {
glog.V(0).Infof("upload source data %v: %v", sourceChunk.GetFileIdString(), err)
return "", fmt.Errorf("upload data: %w", err)
}
fileId, uploadResult, err, _ := uploader.UploadWithRetry(
fs,
&filer_pb.AssignVolumeRequest{
Count: 1,
Replication: fs.replication,
Collection: fs.collection,
TtlSec: fs.ttlSec,
DataCenter: fs.dataCenter,
DiskType: fs.diskType,
Path: path,
},
&operation.UploadOption{
Filename: filename,
Cipher: false,
IsInputCompressed: "gzip" == header.Get("Content-Encoding"),
MimeType: header.Get("Content-Type"),
PairMap: nil,
RetryForever: true,
},
func(host, fileId string) string {
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
if fs.writeChunkByFiler {
fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", fs.address, fileId)
}
glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header)
return fileUrl
},
resp.Body,
)
if err != nil {
glog.V(0).Infof("upload source data %v: %v", sourceChunk.GetFileIdString(), err)
return "", fmt.Errorf("upload data: %w", err)
}
if uploadResult.Error != "" {
glog.V(0).Infof("upload failure %v: %v", filename, err)
return "", fmt.Errorf("upload result: %v", uploadResult.Error)
}
return
}
var _ = filer_pb.FilerClient(&FilerSink{})
func (fs *FilerSink) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
return pb.WithGrpcClient(streamingMode, fs.signature, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
}, fs.grpcAddress, false, fs.grpcDialOption)
}
func (fs *FilerSink) AdjustedUrl(location *filer_pb.Location) string {
return location.Url
}
func (fs *FilerSink) GetDataCenter() string {
return fs.dataCenter
}