fix(replication): resume partial chunk reads on EOF instead of re-downloading (#8607)
* fix(replication): resume partial chunk reads on EOF instead of re-downloading When replicating chunks and the source connection drops mid-transfer, accumulate the bytes already received and retry with a Range header to fetch only the remaining bytes. This avoids re-downloading potentially large chunks from scratch on each retry, reducing load on busy source servers and speeding up recovery. * test(replication): add tests for downloadWithRange including gzip partial reads Tests cover: - No offset (no Range header sent) - With offset (Range header verified) - Content-Disposition filename extraction - Partial read + resume: server drops connection mid-transfer, client resumes with Range from the offset of received bytes - Gzip partial read + resume: first response is gzip-encoded (Go auto- decompresses), connection drops, resume request gets decompressed data (Go doesn't add Accept-Encoding when Range is set, so the server decompresses), combined bytes match original * fix(replication): address PR review comments - Consolidate downloadWithRange into DownloadFile with optional offset parameter (variadic), eliminating code duplication (DRY) - Validate HTTP response status: require 206 + correct Content-Range when offset > 0, reject when server ignores Range header - Use if/else for fullData assignment for clarity - Add test for rejected Range (server returns 200 instead of 206) * refactor(replication): remove unused ReplicationSource interface The interface was never referenced and its signature didn't match the actual FilerSource.ReadPart method. --------- Co-authored-by: Copilot <copilot@github.com>
This commit is contained in:
@@ -6,6 +6,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
@@ -241,17 +242,44 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string,
|
||||
}
|
||||
|
||||
eofBackoff := time.Duration(0)
|
||||
var partialData []byte
|
||||
var savedFilename string
|
||||
var savedHeader http.Header
|
||||
var savedSourceUrl string
|
||||
retryName := fmt.Sprintf("replicate chunk %s", sourceChunk.GetFileIdString())
|
||||
err = util.RetryUntil(retryName, func() error {
|
||||
filename, header, resp, readErr := fs.filerSource.ReadPart(sourceChunk.GetFileIdString())
|
||||
filename, header, resp, readErr := fs.filerSource.ReadPart(sourceChunk.GetFileIdString(), int64(len(partialData)))
|
||||
if readErr != nil {
|
||||
return fmt.Errorf("read part %s: %w", sourceChunk.GetFileIdString(), readErr)
|
||||
}
|
||||
defer util_http.CloseResponse(resp)
|
||||
|
||||
sourceUrl := ""
|
||||
if resp.Request != nil && resp.Request.URL != nil {
|
||||
sourceUrl = resp.Request.URL.String()
|
||||
// Save metadata from first successful response
|
||||
if len(partialData) == 0 {
|
||||
savedFilename = filename
|
||||
savedHeader = header
|
||||
if resp.Request != nil && resp.Request.URL != nil {
|
||||
savedSourceUrl = resp.Request.URL.String()
|
||||
}
|
||||
}
|
||||
|
||||
// Read the response body
|
||||
data, readBodyErr := io.ReadAll(resp.Body)
|
||||
if readBodyErr != nil {
|
||||
// Keep whatever bytes we received before the error
|
||||
partialData = append(partialData, data...)
|
||||
return fmt.Errorf("read body: %w", readBodyErr)
|
||||
}
|
||||
|
||||
// Combine with previously accumulated partial data
|
||||
var fullData []byte
|
||||
if len(partialData) > 0 {
|
||||
fullData = append(partialData, data...)
|
||||
glog.V(0).Infof("resumed reading %s, got %d + %d = %d bytes",
|
||||
sourceChunk.GetFileIdString(), len(partialData), len(data), len(fullData))
|
||||
partialData = nil
|
||||
} else {
|
||||
fullData = data
|
||||
}
|
||||
|
||||
currentFileId, uploadResult, uploadErr, _ := uploader.UploadWithRetry(
|
||||
@@ -266,20 +294,20 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string,
|
||||
Path: path,
|
||||
},
|
||||
&operation.UploadOption{
|
||||
Filename: filename,
|
||||
Filename: savedFilename,
|
||||
Cipher: false,
|
||||
IsInputCompressed: "gzip" == header.Get("Content-Encoding"),
|
||||
MimeType: header.Get("Content-Type"),
|
||||
IsInputCompressed: "gzip" == savedHeader.Get("Content-Encoding"),
|
||||
MimeType: savedHeader.Get("Content-Type"),
|
||||
PairMap: nil,
|
||||
RetryForever: false,
|
||||
SourceUrl: sourceUrl,
|
||||
SourceUrl: savedSourceUrl,
|
||||
},
|
||||
func(host, fileId string) string {
|
||||
fileUrl := fs.buildUploadUrl(host, fileId)
|
||||
glog.V(4).Infof("replicating %s to %s header:%+v", filename, fileUrl, header)
|
||||
glog.V(4).Infof("replicating %s to %s header:%+v", savedFilename, fileUrl, savedHeader)
|
||||
return fileUrl
|
||||
},
|
||||
resp.Body,
|
||||
util.NewBytesReader(fullData),
|
||||
)
|
||||
if uploadErr != nil {
|
||||
return fmt.Errorf("upload data: %w", uploadErr)
|
||||
@@ -291,17 +319,18 @@ func (fs *FilerSink) fetchAndWrite(sourceChunk *filer_pb.FileChunk, path string,
|
||||
eofBackoff = 0
|
||||
fileId = currentFileId
|
||||
return nil
|
||||
}, func(uploadErr error) (shouldContinue bool) {
|
||||
}, func(retryErr error) (shouldContinue bool) {
|
||||
if fs.hasSourceNewerVersion(path, sourceMtime) {
|
||||
glog.V(1).Infof("skip retrying stale source %s for %s: %v", sourceChunk.GetFileIdString(), path, uploadErr)
|
||||
glog.V(1).Infof("skip retrying stale source %s for %s: %v", sourceChunk.GetFileIdString(), path, retryErr)
|
||||
return false
|
||||
}
|
||||
if isEofError(uploadErr) {
|
||||
if isEofError(retryErr) {
|
||||
eofBackoff = nextEofBackoff(eofBackoff)
|
||||
glog.V(0).Infof("source connection interrupted while replicating %s for %s, backing off %v: %v", sourceChunk.GetFileIdString(), path, eofBackoff, uploadErr)
|
||||
glog.V(0).Infof("source connection interrupted while replicating %s for %s (%d bytes received so far), backing off %v: %v",
|
||||
sourceChunk.GetFileIdString(), path, len(partialData), eofBackoff, retryErr)
|
||||
time.Sleep(eofBackoff)
|
||||
} else {
|
||||
glog.V(0).Infof("replicate %s for %s: %v", sourceChunk.GetFileIdString(), path, uploadErr)
|
||||
glog.V(0).Infof("replicate %s for %s: %v", sourceChunk.GetFileIdString(), path, retryErr)
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user