* feat(s3): add concurrent chunk prefetch for large file downloads Add a pipe-based prefetch pipeline that overlaps chunk fetching with response writing during S3 GetObject, SSE downloads, and filer proxy. While chunk N streams to the HTTP response, fetch goroutines for the next K chunks establish HTTP connections to volume servers ahead of time, eliminating the RTT gap between sequential chunk fetches. Uses io.Pipe for minimal memory overhead (~1MB per download regardless of chunk size, vs buffering entire chunks). Also increases the streaming read buffer from 64KB to 256KB to reduce syscall overhead. Benchmark results (64KB chunks, prefetch=4): - 0ms latency: 1058 → 2362 MB/s (2.2× faster) - 5ms latency: 11.0 → 41.7 MB/s (3.8× faster) - 10ms latency: 5.9 → 23.3 MB/s (4.0× faster) - 20ms latency: 3.1 → 12.1 MB/s (3.9× faster) * fix: address review feedback for prefetch pipeline - Fix data race: use *chunkPipeResult (pointer) on channel to avoid copying struct while fetch goroutines write to it. Confirmed clean with -race detector. - Remove concurrent map write: retryWithCacheInvalidation no longer updates fileId2Url map. Producer only reads it; consumer never writes. - Use mem.Allocate/mem.Free for copy buffer to reduce GC pressure. - Add local cancellable context so consumer errors (client disconnect) immediately stop the producer and all in-flight fetch goroutines. * fix(test): remove dead code and add Range header support in test server - Remove unused allData variable in makeChunksAndServer - Add Range header handling to createTestServer for partial chunk read coverage (206 Partial Content, 416 Range Not Satisfiable) * fix: correct retry condition and goroutine leak in prefetch pipeline - Fix retry condition: use result.fetchErr/result.written instead of copied to decide cache-invalidation retry. The old condition wrongly triggered retry when the fetch succeeded but the response writer failed on the first write (copied==0 despite fetcher having data). Now matches the sequential path (stream.go:197) which checks whether the fetcher itself wrote zero bytes. - Fix goroutine leak: when the producer's send to the results channel is interrupted by context cancellation, the fetch goroutine was already launched but the result was never sent to the channel. The drain loop couldn't handle it. Now waits on result.done before returning so every fetch goroutine is properly awaited.
681 lines
16 KiB
Go
681 lines
16 KiB
Go
package http
|
|
|
|
import (
|
|
"compress/gzip"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
"github.com/seaweedfs/seaweedfs/weed/util/mem"
|
|
"github.com/seaweedfs/seaweedfs/weed/util/request_id"
|
|
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/security"
|
|
)
|
|
|
|
var ErrNotFound = fmt.Errorf("not found")
|
|
var ErrTooManyRequests = fmt.Errorf("too many requests")
|
|
|
|
var (
|
|
jwtSigningReadKey security.SigningKey
|
|
jwtSigningReadKeyExpires int
|
|
loadJwtConfigOnce sync.Once
|
|
)
|
|
|
|
func loadJwtConfig() {
|
|
v := util.GetViper()
|
|
jwtSigningReadKey = security.SigningKey(v.GetString("jwt.signing.read.key"))
|
|
jwtSigningReadKeyExpires = v.GetInt("jwt.signing.read.expires_after_seconds")
|
|
}
|
|
|
|
func Post(url string, values url.Values) ([]byte, error) {
|
|
r, err := GetGlobalHttpClient().PostForm(url, values)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer r.Body.Close()
|
|
b, err := io.ReadAll(r.Body)
|
|
if r.StatusCode >= 400 {
|
|
if err != nil {
|
|
return nil, fmt.Errorf("%s: %d - %s", url, r.StatusCode, string(b))
|
|
} else {
|
|
return nil, fmt.Errorf("%s: %s", url, r.Status)
|
|
}
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return b, nil
|
|
}
|
|
|
|
// github.com/seaweedfs/seaweedfs/unmaintained/repeated_vacuum/repeated_vacuum.go
|
|
// may need increasing http.Client.Timeout
|
|
func Get(url string) ([]byte, bool, error) {
|
|
return GetAuthenticated(url, "")
|
|
}
|
|
|
|
func GetAuthenticated(url, jwt string) ([]byte, bool, error) {
|
|
request, err := http.NewRequest(http.MethodGet, url, nil)
|
|
if err != nil {
|
|
return nil, true, err
|
|
}
|
|
maybeAddAuth(request, jwt)
|
|
request.Header.Add("Accept-Encoding", "gzip")
|
|
|
|
response, err := GetGlobalHttpClient().Do(request)
|
|
if err != nil {
|
|
return nil, true, err
|
|
}
|
|
defer CloseResponse(response)
|
|
|
|
var reader io.ReadCloser
|
|
switch response.Header.Get("Content-Encoding") {
|
|
case "gzip":
|
|
reader, err = gzip.NewReader(response.Body)
|
|
if err != nil {
|
|
return nil, true, err
|
|
}
|
|
defer reader.Close()
|
|
default:
|
|
reader = response.Body
|
|
}
|
|
|
|
b, err := io.ReadAll(reader)
|
|
if response.StatusCode >= 400 {
|
|
retryable := response.StatusCode >= 500
|
|
return nil, retryable, fmt.Errorf("%s: %s", url, response.Status)
|
|
}
|
|
if err != nil {
|
|
return nil, false, err
|
|
}
|
|
return b, false, nil
|
|
}
|
|
|
|
func Head(url string) (http.Header, error) {
|
|
r, err := GetGlobalHttpClient().Head(url)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer CloseResponse(r)
|
|
if r.StatusCode >= 400 {
|
|
return nil, fmt.Errorf("%s: %s", url, r.Status)
|
|
}
|
|
return r.Header, nil
|
|
}
|
|
|
|
func maybeAddAuth(req *http.Request, jwt string) {
|
|
if jwt != "" {
|
|
req.Header.Set("Authorization", "BEARER "+string(jwt))
|
|
}
|
|
}
|
|
|
|
func Delete(url string, jwt string) error {
|
|
req, err := http.NewRequest(http.MethodDelete, url, nil)
|
|
maybeAddAuth(req, jwt)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
resp, e := GetGlobalHttpClient().Do(req)
|
|
if e != nil {
|
|
return e
|
|
}
|
|
defer resp.Body.Close()
|
|
body, err := io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
switch resp.StatusCode {
|
|
case http.StatusNotFound, http.StatusAccepted, http.StatusOK:
|
|
return nil
|
|
}
|
|
m := make(map[string]interface{})
|
|
if e := json.Unmarshal(body, &m); e == nil {
|
|
if s, ok := m["error"].(string); ok {
|
|
return errors.New(s)
|
|
}
|
|
}
|
|
return errors.New(string(body))
|
|
}
|
|
|
|
func DeleteProxied(url string, jwt string) (body []byte, httpStatus int, err error) {
|
|
req, err := http.NewRequest(http.MethodDelete, url, nil)
|
|
maybeAddAuth(req, jwt)
|
|
if err != nil {
|
|
return
|
|
}
|
|
resp, err := GetGlobalHttpClient().Do(req)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer resp.Body.Close()
|
|
body, err = io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return
|
|
}
|
|
httpStatus = resp.StatusCode
|
|
return
|
|
}
|
|
|
|
func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachBuffer func([]byte)) error {
|
|
r, err := GetGlobalHttpClient().PostForm(url, values)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer CloseResponse(r)
|
|
if r.StatusCode != 200 {
|
|
return fmt.Errorf("%s: %s", url, r.Status)
|
|
}
|
|
for {
|
|
n, err := r.Body.Read(allocatedBytes)
|
|
if n > 0 {
|
|
eachBuffer(allocatedBytes[:n])
|
|
}
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
func GetUrlStream(url string, values url.Values, readFn func(io.Reader) error) error {
|
|
r, err := GetGlobalHttpClient().PostForm(url, values)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer CloseResponse(r)
|
|
if r.StatusCode != 200 {
|
|
return fmt.Errorf("%s: %s", url, r.Status)
|
|
}
|
|
return readFn(r.Body)
|
|
}
|
|
|
|
func DownloadFile(fileUrl string, jwt string, offset ...int64) (filename string, header http.Header, resp *http.Response, e error) {
|
|
req, err := http.NewRequest(http.MethodGet, fileUrl, nil)
|
|
if err != nil {
|
|
return "", nil, nil, err
|
|
}
|
|
|
|
maybeAddAuth(req, jwt)
|
|
|
|
var rangeOffset int64
|
|
if len(offset) > 0 {
|
|
rangeOffset = offset[0]
|
|
}
|
|
if rangeOffset > 0 {
|
|
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", rangeOffset))
|
|
}
|
|
|
|
response, err := GetGlobalHttpClient().Do(req)
|
|
if err != nil {
|
|
return "", nil, nil, err
|
|
}
|
|
|
|
if rangeOffset > 0 {
|
|
expected := fmt.Sprintf("bytes %d-", rangeOffset)
|
|
if response.StatusCode != http.StatusPartialContent ||
|
|
!strings.HasPrefix(response.Header.Get("Content-Range"), expected) {
|
|
CloseResponse(response)
|
|
return "", nil, nil, fmt.Errorf("range request %q to %s returned %s with Content-Range %q",
|
|
req.Header.Get("Range"), fileUrl, response.Status, response.Header.Get("Content-Range"))
|
|
}
|
|
}
|
|
|
|
header = response.Header
|
|
contentDisposition := response.Header["Content-Disposition"]
|
|
if len(contentDisposition) > 0 {
|
|
idx := strings.Index(contentDisposition[0], "filename=")
|
|
if idx != -1 {
|
|
filename = contentDisposition[0][idx+len("filename="):]
|
|
filename = strings.Trim(filename, "\"")
|
|
}
|
|
}
|
|
resp = response
|
|
return
|
|
}
|
|
|
|
func Do(req *http.Request) (resp *http.Response, err error) {
|
|
return GetGlobalHttpClient().Do(req)
|
|
}
|
|
|
|
func NormalizeUrl(url string) (string, error) {
|
|
return GetGlobalHttpClient().NormalizeHttpScheme(url)
|
|
}
|
|
|
|
func ReadUrl(ctx context.Context, fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, buf []byte) (int64, error) {
|
|
|
|
if cipherKey != nil {
|
|
var n int
|
|
_, err := readEncryptedUrl(ctx, fileUrl, "", cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) {
|
|
n = copy(buf, data)
|
|
})
|
|
return int64(n), err
|
|
}
|
|
|
|
req, err := http.NewRequest(http.MethodGet, fileUrl, nil)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
if !isFullChunk {
|
|
req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
|
|
} else {
|
|
req.Header.Set("Accept-Encoding", "gzip")
|
|
}
|
|
|
|
r, err := GetGlobalHttpClient().Do(req)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer CloseResponse(r)
|
|
|
|
if r.StatusCode >= 400 {
|
|
return 0, fmt.Errorf("%s: %s", fileUrl, r.Status)
|
|
}
|
|
|
|
var reader io.ReadCloser
|
|
contentEncoding := r.Header.Get("Content-Encoding")
|
|
switch contentEncoding {
|
|
case "gzip":
|
|
reader, err = gzip.NewReader(r.Body)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer reader.Close()
|
|
default:
|
|
reader = r.Body
|
|
}
|
|
|
|
var (
|
|
i, m int
|
|
n int64
|
|
)
|
|
|
|
// refers to https://github.com/golang/go/blob/master/src/bytes/buffer.go#L199
|
|
// commit id c170b14c2c1cfb2fd853a37add92a82fd6eb4318
|
|
for {
|
|
m, err = reader.Read(buf[i:])
|
|
i += m
|
|
n += int64(m)
|
|
if err == io.EOF {
|
|
return n, nil
|
|
}
|
|
if err != nil {
|
|
return n, err
|
|
}
|
|
if n == int64(len(buf)) {
|
|
break
|
|
}
|
|
}
|
|
// drains the response body to avoid memory leak
|
|
data, _ := io.ReadAll(reader)
|
|
if len(data) != 0 {
|
|
glog.V(1).InfofCtx(ctx, "%s reader has remaining %d bytes", contentEncoding, len(data))
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
func ReadUrlAsStream(ctx context.Context, fileUrl, jwt string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) {
|
|
if cipherKey != nil {
|
|
return readEncryptedUrl(ctx, fileUrl, jwt, cipherKey, isContentGzipped, isFullChunk, offset, size, fn)
|
|
}
|
|
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fileUrl, nil)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
maybeAddAuth(req, jwt)
|
|
|
|
if isFullChunk {
|
|
req.Header.Add("Accept-Encoding", "gzip")
|
|
} else {
|
|
req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+int64(size)-1))
|
|
}
|
|
request_id.InjectToRequest(ctx, req)
|
|
|
|
r, err := GetGlobalHttpClient().Do(req)
|
|
if err != nil {
|
|
return true, err
|
|
}
|
|
defer CloseResponse(r)
|
|
if r.StatusCode >= 400 {
|
|
if r.StatusCode == http.StatusNotFound {
|
|
return true, fmt.Errorf("%s: %s: %w", fileUrl, r.Status, ErrNotFound)
|
|
}
|
|
if r.StatusCode == http.StatusTooManyRequests {
|
|
return false, fmt.Errorf("%s: %s: %w", fileUrl, r.Status, ErrTooManyRequests)
|
|
}
|
|
retryable = r.StatusCode >= 499
|
|
return retryable, fmt.Errorf("%s: %s", fileUrl, r.Status)
|
|
}
|
|
|
|
var reader io.ReadCloser
|
|
contentEncoding := r.Header.Get("Content-Encoding")
|
|
switch contentEncoding {
|
|
case "gzip":
|
|
reader, err = gzip.NewReader(r.Body)
|
|
defer reader.Close()
|
|
default:
|
|
reader = r.Body
|
|
}
|
|
|
|
var (
|
|
m int
|
|
)
|
|
buf := mem.Allocate(256 * 1024)
|
|
defer mem.Free(buf)
|
|
|
|
for {
|
|
// Check for context cancellation before each read
|
|
select {
|
|
case <-ctx.Done():
|
|
return false, ctx.Err()
|
|
default:
|
|
}
|
|
|
|
m, err = reader.Read(buf)
|
|
if m > 0 {
|
|
fn(buf[:m])
|
|
}
|
|
if err == io.EOF {
|
|
return false, nil
|
|
}
|
|
if err != nil {
|
|
return true, err
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
func readEncryptedUrl(ctx context.Context, fileUrl, jwt string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) {
|
|
encryptedData, retryable, err := GetAuthenticated(fileUrl, jwt)
|
|
if err != nil {
|
|
return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err)
|
|
}
|
|
decryptedData, err := util.Decrypt(encryptedData, util.CipherKey(cipherKey))
|
|
if err != nil {
|
|
return false, fmt.Errorf("decrypt %s: %v", fileUrl, err)
|
|
}
|
|
if isContentCompressed {
|
|
decryptedData, err = util.DecompressData(decryptedData)
|
|
if err != nil {
|
|
glog.V(0).InfofCtx(ctx, "unzip decrypt %s: %v", fileUrl, err)
|
|
}
|
|
}
|
|
if len(decryptedData) < int(offset)+size {
|
|
return false, fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size)
|
|
}
|
|
if isFullChunk {
|
|
fn(decryptedData)
|
|
} else {
|
|
sliceEnd := int(offset) + size
|
|
fn(decryptedData[int(offset):sliceEnd])
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
func ReadUrlAsReaderCloser(fileUrl string, jwt string, rangeHeader string) (*http.Response, io.ReadCloser, error) {
|
|
|
|
req, err := http.NewRequest(http.MethodGet, fileUrl, nil)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if rangeHeader != "" {
|
|
req.Header.Add("Range", rangeHeader)
|
|
} else {
|
|
req.Header.Add("Accept-Encoding", "gzip")
|
|
}
|
|
|
|
maybeAddAuth(req, jwt)
|
|
|
|
r, err := GetGlobalHttpClient().Do(req)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if r.StatusCode >= 400 {
|
|
CloseResponse(r)
|
|
return nil, nil, fmt.Errorf("%s: %s", fileUrl, r.Status)
|
|
}
|
|
|
|
var reader io.ReadCloser
|
|
contentEncoding := r.Header.Get("Content-Encoding")
|
|
switch contentEncoding {
|
|
case "gzip":
|
|
reader, err = gzip.NewReader(r.Body)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
default:
|
|
reader = r.Body
|
|
}
|
|
|
|
return r, reader, nil
|
|
}
|
|
|
|
func CloseResponse(resp *http.Response) {
|
|
if resp == nil || resp.Body == nil {
|
|
return
|
|
}
|
|
reader := &CountingReader{reader: resp.Body}
|
|
io.Copy(io.Discard, reader)
|
|
resp.Body.Close()
|
|
if reader.BytesRead > 0 {
|
|
glog.V(1).Infof("response leftover %d bytes", reader.BytesRead)
|
|
}
|
|
}
|
|
|
|
func CloseRequest(req *http.Request) {
|
|
reader := &CountingReader{reader: req.Body}
|
|
io.Copy(io.Discard, reader)
|
|
req.Body.Close()
|
|
if reader.BytesRead > 0 {
|
|
glog.V(1).Infof("request leftover %d bytes", reader.BytesRead)
|
|
}
|
|
}
|
|
|
|
type CountingReader struct {
|
|
reader io.Reader
|
|
BytesRead int
|
|
}
|
|
|
|
func (r *CountingReader) Read(p []byte) (n int, err error) {
|
|
n, err = r.reader.Read(p)
|
|
r.BytesRead += n
|
|
return n, err
|
|
}
|
|
|
|
func RetriedFetchChunkData(ctx context.Context, buffer []byte, urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, fileId string) (n int, err error) {
|
|
|
|
loadJwtConfigOnce.Do(loadJwtConfig)
|
|
var jwt security.EncodedJwt
|
|
if len(jwtSigningReadKey) > 0 {
|
|
jwt = security.GenJwtForVolumeServer(
|
|
jwtSigningReadKey,
|
|
jwtSigningReadKeyExpires,
|
|
fileId,
|
|
)
|
|
}
|
|
|
|
// For unencrypted, non-gzipped full chunks, use direct buffer read
|
|
// This avoids the 64KB intermediate buffer and callback overhead
|
|
if cipherKey == nil && !isGzipped && isFullChunk {
|
|
return retriedFetchChunkDataDirect(ctx, buffer, urlStrings, string(jwt))
|
|
}
|
|
|
|
var shouldRetry bool
|
|
|
|
for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
|
|
// Check for context cancellation before starting retry loop
|
|
select {
|
|
case <-ctx.Done():
|
|
return n, ctx.Err()
|
|
default:
|
|
}
|
|
|
|
for _, urlString := range urlStrings {
|
|
// Check for context cancellation before each volume server request
|
|
select {
|
|
case <-ctx.Done():
|
|
return n, ctx.Err()
|
|
default:
|
|
}
|
|
|
|
n = 0
|
|
if strings.Contains(urlString, "%") {
|
|
urlString = url.PathEscape(urlString)
|
|
}
|
|
shouldRetry, err = ReadUrlAsStream(ctx, urlString+"?readDeleted=true", string(jwt), cipherKey, isGzipped, isFullChunk, offset, len(buffer), func(data []byte) {
|
|
// Check for context cancellation during data processing
|
|
select {
|
|
case <-ctx.Done():
|
|
// Stop processing data when context is cancelled
|
|
return
|
|
default:
|
|
}
|
|
|
|
if n < len(buffer) {
|
|
x := copy(buffer[n:], data)
|
|
n += x
|
|
}
|
|
})
|
|
if !shouldRetry {
|
|
break
|
|
}
|
|
if err != nil {
|
|
glog.V(0).InfofCtx(ctx, "read %s failed, err: %v", urlString, err)
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
if err != nil && shouldRetry {
|
|
glog.V(0).InfofCtx(ctx, "retry reading in %v", waitTime)
|
|
// Sleep with proper context cancellation and timer cleanup
|
|
timer := time.NewTimer(waitTime)
|
|
select {
|
|
case <-ctx.Done():
|
|
timer.Stop()
|
|
return n, ctx.Err()
|
|
case <-timer.C:
|
|
// Continue with retry
|
|
}
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
|
|
return n, err
|
|
|
|
}
|
|
|
|
// retriedFetchChunkDataDirect reads chunk data directly into the buffer without
|
|
// intermediate buffering. This reduces memory copies and improves throughput
|
|
// for large chunk reads.
|
|
func retriedFetchChunkDataDirect(ctx context.Context, buffer []byte, urlStrings []string, jwt string) (n int, err error) {
|
|
var shouldRetry bool
|
|
|
|
for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 {
|
|
select {
|
|
case <-ctx.Done():
|
|
return 0, ctx.Err()
|
|
default:
|
|
}
|
|
|
|
for _, urlString := range urlStrings {
|
|
select {
|
|
case <-ctx.Done():
|
|
return 0, ctx.Err()
|
|
default:
|
|
}
|
|
|
|
n, shouldRetry, err = readUrlDirectToBuffer(ctx, urlString+"?readDeleted=true", jwt, buffer)
|
|
if err == nil {
|
|
return n, nil
|
|
}
|
|
if !shouldRetry {
|
|
break
|
|
}
|
|
glog.V(0).InfofCtx(ctx, "read %s failed, err: %v", urlString, err)
|
|
}
|
|
|
|
if err != nil && shouldRetry {
|
|
glog.V(0).InfofCtx(ctx, "retry reading in %v", waitTime)
|
|
timer := time.NewTimer(waitTime)
|
|
select {
|
|
case <-ctx.Done():
|
|
timer.Stop()
|
|
return 0, ctx.Err()
|
|
case <-timer.C:
|
|
}
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
|
|
return n, err
|
|
}
|
|
|
|
// readUrlDirectToBuffer reads HTTP response directly into the provided buffer,
|
|
// avoiding intermediate buffer allocations and copies.
|
|
func readUrlDirectToBuffer(ctx context.Context, fileUrl, jwt string, buffer []byte) (n int, retryable bool, err error) {
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fileUrl, nil)
|
|
if err != nil {
|
|
return 0, false, err
|
|
}
|
|
maybeAddAuth(req, jwt)
|
|
request_id.InjectToRequest(ctx, req)
|
|
|
|
r, err := GetGlobalHttpClient().Do(req)
|
|
if err != nil {
|
|
return 0, true, err
|
|
}
|
|
defer CloseResponse(r)
|
|
|
|
if r.StatusCode >= 400 {
|
|
if r.StatusCode == http.StatusNotFound {
|
|
return 0, true, fmt.Errorf("%s: %s: %w", fileUrl, r.Status, ErrNotFound)
|
|
}
|
|
if r.StatusCode == http.StatusTooManyRequests {
|
|
return 0, false, fmt.Errorf("%s: %s: %w", fileUrl, r.Status, ErrTooManyRequests)
|
|
}
|
|
retryable = r.StatusCode >= 499
|
|
return 0, retryable, fmt.Errorf("%s: %s", fileUrl, r.Status)
|
|
}
|
|
|
|
// Read directly into the buffer without intermediate copying
|
|
// This is significantly faster for large chunks (16MB+)
|
|
var totalRead int
|
|
for totalRead < len(buffer) {
|
|
select {
|
|
case <-ctx.Done():
|
|
return totalRead, false, ctx.Err()
|
|
default:
|
|
}
|
|
|
|
m, readErr := r.Body.Read(buffer[totalRead:])
|
|
totalRead += m
|
|
if readErr != nil {
|
|
if readErr == io.EOF {
|
|
// Return io.ErrUnexpectedEOF if we haven't filled the buffer
|
|
// This prevents silent data corruption from truncated responses
|
|
if totalRead < len(buffer) {
|
|
return totalRead, true, io.ErrUnexpectedEOF
|
|
}
|
|
return totalRead, false, nil
|
|
}
|
|
return totalRead, true, readErr
|
|
}
|
|
}
|
|
|
|
return totalRead, false, nil
|
|
}
|