From 0798b274dd19d3ec24f448a8297aa034ab9f0ae3 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 3 Apr 2026 19:57:30 -0700 Subject: [PATCH] feat(s3): add concurrent chunk prefetch for large file downloads (#8917) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(s3): add concurrent chunk prefetch for large file downloads Add a pipe-based prefetch pipeline that overlaps chunk fetching with response writing during S3 GetObject, SSE downloads, and filer proxy. While chunk N streams to the HTTP response, fetch goroutines for the next K chunks establish HTTP connections to volume servers ahead of time, eliminating the RTT gap between sequential chunk fetches. Uses io.Pipe for minimal memory overhead (~1MB per download regardless of chunk size, vs buffering entire chunks). Also increases the streaming read buffer from 64KB to 256KB to reduce syscall overhead. Benchmark results (64KB chunks, prefetch=4): - 0ms latency: 1058 → 2362 MB/s (2.2× faster) - 5ms latency: 11.0 → 41.7 MB/s (3.8× faster) - 10ms latency: 5.9 → 23.3 MB/s (4.0× faster) - 20ms latency: 3.1 → 12.1 MB/s (3.9× faster) * fix: address review feedback for prefetch pipeline - Fix data race: use *chunkPipeResult (pointer) on channel to avoid copying struct while fetch goroutines write to it. Confirmed clean with -race detector. - Remove concurrent map write: retryWithCacheInvalidation no longer updates fileId2Url map. Producer only reads it; consumer never writes. - Use mem.Allocate/mem.Free for copy buffer to reduce GC pressure. - Add local cancellable context so consumer errors (client disconnect) immediately stop the producer and all in-flight fetch goroutines. * fix(test): remove dead code and add Range header support in test server - Remove unused allData variable in makeChunksAndServer - Add Range header handling to createTestServer for partial chunk read coverage (206 Partial Content, 416 Range Not Satisfiable) * fix: correct retry condition and goroutine leak in prefetch pipeline - Fix retry condition: use result.fetchErr/result.written instead of copied to decide cache-invalidation retry. The old condition wrongly triggered retry when the fetch succeeded but the response writer failed on the first write (copied==0 despite fetcher having data). Now matches the sequential path (stream.go:197) which checks whether the fetcher itself wrote zero bytes. - Fix goroutine leak: when the producer's send to the results channel is interrupted by context cancellation, the fetch goroutine was already launched but the result was never sent to the channel. The drain loop couldn't handle it. Now waits on result.done before returning so every fetch goroutine is properly awaited. --- weed/filer/stream.go | 60 ++++ weed/filer/stream_benchmark_test.go | 317 +++++++++++++++++++ weed/filer/stream_prefetch.go | 274 ++++++++++++++++ weed/filer/stream_prefetch_test.go | 365 ++++++++++++++++++++++ weed/s3api/s3api_object_handlers.go | 6 +- weed/server/filer_server_handlers_read.go | 2 +- weed/util/http/http_global_client_util.go | 2 +- 7 files changed, 1022 insertions(+), 4 deletions(-) create mode 100644 weed/filer/stream_benchmark_test.go create mode 100644 weed/filer/stream_prefetch.go create mode 100644 weed/filer/stream_prefetch_test.go diff --git a/weed/filer/stream.go b/weed/filer/stream.go index e49794fd2..d0c028e88 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -245,6 +245,66 @@ func PrepareStreamContentWithThrottler(ctx context.Context, masterClient wdclien }, nil } +// PrepareStreamContentWithPrefetch is like PrepareStreamContentWithThrottler but uses +// concurrent chunk prefetching to overlap network I/O. When prefetchAhead > 1, fetch +// goroutines establish HTTP connections to volume servers ahead of time, streaming data +// through io.Pipe with minimal memory overhead. +// +// prefetchAhead controls the number of chunks fetched concurrently: +// - 0 or 1: falls back to sequential fetching (same as PrepareStreamContentWithThrottler) +// - 2+: uses pipe-based prefetch pipeline with that many concurrent fetches +func PrepareStreamContentWithPrefetch(ctx context.Context, masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64, prefetchAhead int) (DoStreamContent, error) { + if prefetchAhead <= 1 { + return PrepareStreamContentWithThrottler(ctx, masterClient, jwtFunc, chunks, offset, size, downloadMaxBytesPs) + } + + glog.V(4).InfofCtx(ctx, "prepare to stream content with prefetch=%d for chunks: %d", prefetchAhead, len(chunks)) + chunkViews := ViewFromChunks(ctx, masterClient.GetLookupFileIdFunction(), chunks, offset, size) + + fileId2Url := make(map[string][]string) + + for x := chunkViews.Front(); x != nil; x = x.Next { + chunkView := x.Value + var urlStrings []string + var err error + for _, backoff := range getLookupFileIdBackoffSchedule { + if err := ctx.Err(); err != nil { + return nil, err + } + urlStrings, err = masterClient.GetLookupFileIdFunction()(ctx, chunkView.FileId) + if err == nil && len(urlStrings) > 0 { + break + } + if err := ctx.Err(); err != nil { + return nil, err + } + glog.V(4).InfofCtx(ctx, "waiting for chunk: %s", chunkView.FileId) + timer := time.NewTimer(backoff) + select { + case <-ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return nil, ctx.Err() + case <-timer.C: + } + } + if err != nil { + glog.V(1).InfofCtx(ctx, "operation LookupFileId %s failed, err: %v", chunkView.FileId, err) + return nil, err + } else if len(urlStrings) == 0 { + errUrlNotFound := fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId) + glog.ErrorCtx(ctx, errUrlNotFound) + return nil, errUrlNotFound + } + fileId2Url[chunkView.FileId] = urlStrings + } + + return func(writer io.Writer) error { + return streamChunksPrefetched(ctx, writer, chunkViews, fileId2Url, jwtFunc, masterClient, offset, size, downloadMaxBytesPs, prefetchAhead) + }, nil +} + func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error { streamFn, err := PrepareStreamContent(masterClient, JwtForVolumeServer, chunks, offset, size) if err != nil { diff --git a/weed/filer/stream_benchmark_test.go b/weed/filer/stream_benchmark_test.go new file mode 100644 index 000000000..f0b76e0e7 --- /dev/null +++ b/weed/filer/stream_benchmark_test.go @@ -0,0 +1,317 @@ +package filer + +import ( + "bytes" + "context" + "fmt" + "io" + "math/rand" + "net/http" + "net/http/httptest" + "os" + "strings" + "sync" + "testing" + "time" + + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/wdclient" +) + +func TestMain(m *testing.M) { + // Initialize the global HTTP client required by ReadUrlAsStream + util_http.InitGlobalHttpClient() + os.Exit(m.Run()) +} + +// mockMasterClientForBenchmark implements HasLookupFileIdFunction and CacheInvalidator +type mockMasterClientForBenchmark struct { + urls map[string][]string +} + +func (m *mockMasterClientForBenchmark) GetLookupFileIdFunction() wdclient.LookupFileIdFunctionType { + return func(ctx context.Context, fileId string) ([]string, error) { + if urls, ok := m.urls[fileId]; ok { + return urls, nil + } + return nil, fmt.Errorf("fileId %s not found", fileId) + } +} + +func (m *mockMasterClientForBenchmark) InvalidateCache(fileId string) {} + +// noopJwtFunc returns empty JWT for testing +func noopJwtFunc(fileId string) string { + return "" +} + +// createMockVolumeServer creates an httptest server that serves chunk data +// with configurable per-request latency to simulate network conditions. +// The latency is applied once per request (simulating RTT), not per byte. +func createMockVolumeServer(chunkData map[string][]byte, latency time.Duration) *httptest.Server { + var mu sync.RWMutex + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Simulate network latency (connection setup + RTT) + if latency > 0 { + time.Sleep(latency) + } + + // Extract fileId from path (e.g., "/1,abc123") + path := r.URL.Path + if strings.HasPrefix(path, "/") { + path = path[1:] + } + + mu.RLock() + data, ok := chunkData[path] + mu.RUnlock() + if !ok { + http.Error(w, "not found", http.StatusNotFound) + return + } + + // Handle Range header + rangeHeader := r.Header.Get("Range") + if rangeHeader != "" { + var start, end int64 + fmt.Sscanf(rangeHeader, "bytes=%d-%d", &start, &end) + if start >= 0 && end < int64(len(data)) && start <= end { + w.Header().Set("Content-Length", fmt.Sprintf("%d", end-start+1)) + w.WriteHeader(http.StatusPartialContent) + w.Write(data[start : end+1]) + return + } + } + + w.Header().Set("Content-Length", fmt.Sprintf("%d", len(data))) + w.WriteHeader(http.StatusOK) + w.Write(data) + })) +} + +// benchmarkConfig holds parameters for a single benchmark scenario +type benchmarkConfig struct { + numChunks int + chunkSize int + latency time.Duration + prefetch int // 0 = sequential +} + +func (c benchmarkConfig) name() string { + name := fmt.Sprintf("chunks=%d/size=%dKB/latency=%dms", + c.numChunks, c.chunkSize/1024, c.latency.Milliseconds()) + if c.prefetch > 0 { + name += fmt.Sprintf("/prefetch=%d", c.prefetch) + } + return name +} + +// setupBenchmark creates mock infrastructure and returns chunks, master client, and cleanup func +func setupBenchmark(b *testing.B, cfg benchmarkConfig) ([]*filer_pb.FileChunk, *mockMasterClientForBenchmark, func()) { + b.Helper() + + // Generate random chunk data + chunkData := make(map[string][]byte, cfg.numChunks) + chunks := make([]*filer_pb.FileChunk, cfg.numChunks) + + for i := 0; i < cfg.numChunks; i++ { + fileId := fmt.Sprintf("1,%x", i) + data := make([]byte, cfg.chunkSize) + rand.Read(data) + chunkData[fileId] = data + + chunks[i] = &filer_pb.FileChunk{ + FileId: fileId, + Offset: int64(i * cfg.chunkSize), + Size: uint64(cfg.chunkSize), + ModifiedTsNs: int64(i), + Fid: &filer_pb.FileId{FileKey: uint64(i)}, + } + } + + // Start mock volume server + server := createMockVolumeServer(chunkData, cfg.latency) + + // Build URL map + urls := make(map[string][]string, cfg.numChunks) + for i := 0; i < cfg.numChunks; i++ { + fileId := fmt.Sprintf("1,%x", i) + urls[fileId] = []string{server.URL + "/" + fileId} + } + + masterClient := &mockMasterClientForBenchmark{urls: urls} + cleanup := func() { server.Close() } + + return chunks, masterClient, cleanup +} + +// runSequentialBenchmark runs the current sequential streaming path +func runSequentialBenchmark(b *testing.B, cfg benchmarkConfig) { + chunks, masterClient, cleanup := setupBenchmark(b, cfg) + defer cleanup() + + totalSize := int64(cfg.numChunks * cfg.chunkSize) + + b.ResetTimer() + b.SetBytes(totalSize) + + for i := 0; i < b.N; i++ { + streamFn, err := PrepareStreamContentWithThrottler( + context.Background(), + masterClient, + noopJwtFunc, + chunks, + 0, + totalSize, + 0, // no throttle + ) + if err != nil { + b.Fatal(err) + } + if err := streamFn(io.Discard); err != nil { + b.Fatal(err) + } + } +} + +// BenchmarkStreamSequential benchmarks the current sequential streaming path. +// This provides the BEFORE baseline for comparison. +func BenchmarkStreamSequential(b *testing.B) { + configs := []benchmarkConfig{ + // Pure throughput (no latency) + {numChunks: 16, chunkSize: 64 * 1024, latency: 0}, + {numChunks: 64, chunkSize: 64 * 1024, latency: 0}, + // Moderate latency — shows RTT gap overhead + {numChunks: 16, chunkSize: 64 * 1024, latency: 5 * time.Millisecond}, + {numChunks: 64, chunkSize: 64 * 1024, latency: 5 * time.Millisecond}, + // High latency — significant RTT overhead + {numChunks: 16, chunkSize: 64 * 1024, latency: 20 * time.Millisecond}, + {numChunks: 64, chunkSize: 64 * 1024, latency: 10 * time.Millisecond}, + } + + for _, cfg := range configs { + b.Run(cfg.name(), func(b *testing.B) { + runSequentialBenchmark(b, cfg) + }) + } +} + +// BenchmarkStreamSequentialVerify is a quick functional test that the benchmark +// infrastructure works correctly — ensures data integrity through the pipeline. +func BenchmarkStreamSequentialVerify(b *testing.B) { + cfg := benchmarkConfig{numChunks: 4, chunkSize: 1024, latency: 0} + chunks, masterClient, cleanup := setupBenchmark(b, cfg) + defer cleanup() + + totalSize := int64(cfg.numChunks * cfg.chunkSize) + + streamFn, err := PrepareStreamContentWithThrottler( + context.Background(), + masterClient, + noopJwtFunc, + chunks, + 0, + totalSize, + 0, + ) + if err != nil { + b.Fatal(err) + } + + var buf bytes.Buffer + if err := streamFn(&buf); err != nil { + b.Fatal(err) + } + + if buf.Len() != int(totalSize) { + b.Fatalf("expected %d bytes, got %d", totalSize, buf.Len()) + } +} + +// runPrefetchBenchmark runs the new prefetch streaming path +func runPrefetchBenchmark(b *testing.B, cfg benchmarkConfig) { + chunks, masterClient, cleanup := setupBenchmark(b, cfg) + defer cleanup() + + totalSize := int64(cfg.numChunks * cfg.chunkSize) + + b.ResetTimer() + b.SetBytes(totalSize) + + for i := 0; i < b.N; i++ { + streamFn, err := PrepareStreamContentWithPrefetch( + context.Background(), + masterClient, + noopJwtFunc, + chunks, + 0, + totalSize, + 0, // no throttle + cfg.prefetch, + ) + if err != nil { + b.Fatal(err) + } + if err := streamFn(io.Discard); err != nil { + b.Fatal(err) + } + } +} + +// BenchmarkStreamPrefetch benchmarks the new prefetch streaming path. +// Compare against BenchmarkStreamSequential for the AFTER measurement. +func BenchmarkStreamPrefetch(b *testing.B) { + configs := []benchmarkConfig{ + // Pure throughput (no latency) — should be similar to sequential + {numChunks: 16, chunkSize: 64 * 1024, latency: 0, prefetch: 4}, + {numChunks: 64, chunkSize: 64 * 1024, latency: 0, prefetch: 4}, + // Moderate latency — prefetch should eliminate most RTT overhead + {numChunks: 16, chunkSize: 64 * 1024, latency: 5 * time.Millisecond, prefetch: 4}, + {numChunks: 64, chunkSize: 64 * 1024, latency: 5 * time.Millisecond, prefetch: 4}, + // High latency — most benefit from prefetch + {numChunks: 16, chunkSize: 64 * 1024, latency: 20 * time.Millisecond, prefetch: 4}, + {numChunks: 64, chunkSize: 64 * 1024, latency: 10 * time.Millisecond, prefetch: 4}, + // Vary prefetch count with moderate latency + {numChunks: 64, chunkSize: 64 * 1024, latency: 5 * time.Millisecond, prefetch: 2}, + {numChunks: 64, chunkSize: 64 * 1024, latency: 5 * time.Millisecond, prefetch: 8}, + } + + for _, cfg := range configs { + b.Run(cfg.name(), func(b *testing.B) { + runPrefetchBenchmark(b, cfg) + }) + } +} + +// BenchmarkStreamPrefetchVerify verifies data integrity through the prefetch pipeline. +func BenchmarkStreamPrefetchVerify(b *testing.B) { + cfg := benchmarkConfig{numChunks: 4, chunkSize: 1024, latency: 0, prefetch: 4} + chunks, masterClient, cleanup := setupBenchmark(b, cfg) + defer cleanup() + + totalSize := int64(cfg.numChunks * cfg.chunkSize) + + streamFn, err := PrepareStreamContentWithPrefetch( + context.Background(), + masterClient, + noopJwtFunc, + chunks, + 0, + totalSize, + 0, + cfg.prefetch, + ) + if err != nil { + b.Fatal(err) + } + + var buf bytes.Buffer + if err := streamFn(&buf); err != nil { + b.Fatal(err) + } + + if buf.Len() != int(totalSize) { + b.Fatalf("expected %d bytes, got %d", totalSize, buf.Len()) + } +} diff --git a/weed/filer/stream_prefetch.go b/weed/filer/stream_prefetch.go new file mode 100644 index 000000000..511f2b90c --- /dev/null +++ b/weed/filer/stream_prefetch.go @@ -0,0 +1,274 @@ +package filer + +import ( + "context" + "fmt" + "io" + "sync" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/stats" + "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/util/mem" + "github.com/seaweedfs/seaweedfs/weed/wdclient" +) + +// chunkPipeResult represents a prefetched chunk streaming through a pipe. +// The fetch goroutine writes data into the pipeWriter; the consumer reads from pipeReader. +type chunkPipeResult struct { + chunkView *ChunkView + reader *io.PipeReader + fetchErr error // final error from fetch goroutine + written int64 // bytes written by fetch goroutine + done chan struct{} // closed when fetch goroutine finishes + urlStrings []string // snapshot of URLs at dispatch time (for retry logic) +} + +// streamChunksPrefetched streams chunks with concurrent prefetch using io.Pipe. +// +// For each chunk in file order, a goroutine is launched (bounded by a semaphore) +// that establishes an HTTP connection to the volume server and streams data through +// an io.Pipe. The consumer reads from pipes in order, writing to the response. +// +// Memory usage is minimal: pipes are synchronous (no buffering), and only one +// reusable copy buffer is allocated for the consumer. +func streamChunksPrefetched( + ctx context.Context, + writer io.Writer, + chunkViews *IntervalList[*ChunkView], + fileId2Url map[string][]string, + jwtFunc VolumeServerJwtFunction, + masterClient wdclient.HasLookupFileIdFunction, + offset int64, + size int64, + downloadMaxBytesPs int64, + prefetchAhead int, +) error { + downloadThrottler := util.NewWriteThrottler(downloadMaxBytesPs) + + // Create a local cancellable context so the consumer can stop the producer + // and all in-flight fetch goroutines on error (e.g., client disconnect). + localCtx, localCancel := context.WithCancel(ctx) + defer localCancel() + + // Ordered channel: one entry per chunk, in file order. + // Capacity = prefetchAhead so the producer can run ahead. + // Uses pointer to avoid copying the struct while fetch goroutines write to it. + results := make(chan *chunkPipeResult, prefetchAhead) + + // Semaphore to limit concurrent fetch goroutines (and thus HTTP connections). + sem := make(chan struct{}, prefetchAhead) + + // Producer: walks chunk list, launches fetch goroutines, sends results in order. + // The producer only reads from fileId2Url (populated before streaming starts), + // so there is no concurrent map access — the consumer never writes to it. + var producerWg sync.WaitGroup + producerWg.Add(1) + go func() { + defer producerWg.Done() + defer close(results) + + for x := chunkViews.Front(); x != nil; x = x.Next { + chunkView := x.Value + + // Check context before starting new fetch + select { + case <-localCtx.Done(): + return + default: + } + + // Acquire semaphore slot (bounds concurrent HTTP connections) + select { + case sem <- struct{}{}: + case <-localCtx.Done(): + return + } + + pr, pw := io.Pipe() + urlStrings := fileId2Url[chunkView.FileId] + jwt := jwtFunc(chunkView.FileId) + + result := &chunkPipeResult{ + chunkView: chunkView, + reader: pr, + done: make(chan struct{}), + urlStrings: urlStrings, + } + + // Launch fetch goroutine + go func(cv *ChunkView, urls []string, jwt string, pw *io.PipeWriter, res *chunkPipeResult) { + defer func() { <-sem }() // release semaphore + defer close(res.done) + + written, err := retriedStreamFetchChunkData( + localCtx, pw, urls, jwt, + cv.CipherKey, cv.IsGzipped, cv.IsFullChunk(), + cv.OffsetInChunk, int(cv.ViewSize), + ) + res.written = written + res.fetchErr = err + + if err != nil { + pw.CloseWithError(err) + } else { + pw.Close() + } + }(chunkView, urlStrings, jwt, pw, result) + + // Send result to consumer (blocks if channel full, back-pressuring producer) + select { + case results <- result: + case <-localCtx.Done(): + // Consumer gone; close the pipe and wait for the fetch goroutine + // to finish so we don't leak it (this result was never sent to + // the channel, so the drain loop won't handle it). + pr.Close() + <-result.done + return + } + } + }() + + // Consumer: reads from results channel in order, writes to response writer. + // Use the SeaweedFS memory pool for the copy buffer to reduce GC pressure. + copyBuf := mem.Allocate(256 * 1024) + defer mem.Free(copyBuf) + remaining := size + + var consumeErr error + for result := range results { + chunkView := result.chunkView + + // Handle gap before this chunk (zero-fill) + if offset < chunkView.ViewOffset { + gap := chunkView.ViewOffset - offset + remaining -= gap + glog.V(4).InfofCtx(ctx, "prefetch zero [%d,%d)", offset, chunkView.ViewOffset) + if err := writeZero(writer, gap); err != nil { + consumeErr = fmt.Errorf("write zero [%d,%d): %w", offset, chunkView.ViewOffset, err) + result.reader.Close() + break + } + offset = chunkView.ViewOffset + } + + // Stream chunk data from pipe to response + start := time.Now() + _, copyErr := io.CopyBuffer(writer, result.reader, copyBuf) + result.reader.Close() + + // Wait for fetch goroutine to finish to get final error + <-result.done + + // Determine the effective error + err := copyErr + if err == nil && result.fetchErr != nil && result.written == 0 { + err = result.fetchErr + } + + // If the fetcher itself failed before writing any data, try cache invalidation + // + re-fetch (same as sequential path stream.go:197). We check result.fetchErr + // and result.written (not copied) to avoid wrongly retrying when the fetch + // succeeded but the response writer failed on the first write. + if result.fetchErr != nil && result.written == 0 { + if err := localCtx.Err(); err != nil { + consumeErr = err + break + } + retryErr := retryWithCacheInvalidation(localCtx, writer, chunkView, result.urlStrings, jwtFunc, masterClient) + if retryErr != nil { + stats.FilerHandlerCounter.WithLabelValues("chunkDownloadError").Inc() + consumeErr = fmt.Errorf("read chunk: %w", retryErr) + break + } + // Retry succeeded + err = nil + } else if err != nil { + if localCtx.Err() != nil { + consumeErr = localCtx.Err() + } else { + stats.FilerHandlerCounter.WithLabelValues("chunkDownloadError").Inc() + consumeErr = fmt.Errorf("read chunk: %w", err) + } + break + } + + offset += int64(chunkView.ViewSize) + remaining -= int64(chunkView.ViewSize) + stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds()) + stats.FilerHandlerCounter.WithLabelValues("chunkDownload").Inc() + downloadThrottler.MaybeSlowdown(int64(chunkView.ViewSize)) + } + + // Cancel the local context to stop the producer and any in-flight fetchers early. + // This ensures goroutines don't linger after the consumer exits (e.g., on write error). + localCancel() + + // Drain remaining results to close pipes and unblock fetch goroutines + for result := range results { + result.reader.Close() + <-result.done + } + + // Wait for producer to finish + producerWg.Wait() + + if consumeErr != nil { + return consumeErr + } + + // Handle trailing zero-fill + if remaining > 0 { + glog.V(4).InfofCtx(ctx, "prefetch zero [%d,%d)", offset, offset+remaining) + if err := writeZero(writer, remaining); err != nil { + return fmt.Errorf("write zero [%d,%d): %w", offset, offset+remaining, err) + } + } + + return nil +} + +// retryWithCacheInvalidation attempts to re-fetch a chunk after invalidating the URL cache. +// This mirrors the retry logic in PrepareStreamContentWithThrottler's sequential path. +func retryWithCacheInvalidation( + ctx context.Context, + writer io.Writer, + chunkView *ChunkView, + oldUrlStrings []string, + jwtFunc VolumeServerJwtFunction, + masterClient wdclient.HasLookupFileIdFunction, +) error { + invalidator, ok := masterClient.(CacheInvalidator) + if !ok { + return fmt.Errorf("read chunk %s failed and no cache invalidator available", chunkView.FileId) + } + + glog.V(0).InfofCtx(ctx, "prefetch read chunk %s failed, invalidating cache and retrying", chunkView.FileId) + invalidator.InvalidateCache(chunkView.FileId) + + newUrlStrings, lookupErr := masterClient.GetLookupFileIdFunction()(ctx, chunkView.FileId) + if lookupErr != nil { + glog.WarningfCtx(ctx, "failed to re-lookup chunk %s after cache invalidation: %v", chunkView.FileId, lookupErr) + return fmt.Errorf("re-lookup chunk %s: %w", chunkView.FileId, lookupErr) + } + if len(newUrlStrings) == 0 { + glog.WarningfCtx(ctx, "re-lookup for chunk %s returned no locations, skipping retry", chunkView.FileId) + return fmt.Errorf("re-lookup chunk %s: no locations", chunkView.FileId) + } + + if urlSlicesEqual(oldUrlStrings, newUrlStrings) { + glog.V(0).InfofCtx(ctx, "re-lookup returned same locations for chunk %s, skipping retry", chunkView.FileId) + return fmt.Errorf("read chunk %s failed, same locations after cache invalidation", chunkView.FileId) + } + + glog.V(0).InfofCtx(ctx, "retrying read chunk %s with new locations: %v", chunkView.FileId, newUrlStrings) + jwt := jwtFunc(chunkView.FileId) + _, err := retriedStreamFetchChunkData( + ctx, writer, newUrlStrings, jwt, + chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), + chunkView.OffsetInChunk, int(chunkView.ViewSize), + ) + return err +} diff --git a/weed/filer/stream_prefetch_test.go b/weed/filer/stream_prefetch_test.go new file mode 100644 index 000000000..ee09178a1 --- /dev/null +++ b/weed/filer/stream_prefetch_test.go @@ -0,0 +1,365 @@ +package filer + +import ( + "bytes" + "context" + "fmt" + "io" + "math/rand" + "net/http" + "net/http/httptest" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/wdclient" +) + +// testMasterClient implements HasLookupFileIdFunction and CacheInvalidator for tests +type testMasterClient struct { + urls map[string][]string + invalidatedCount int32 +} + +func (m *testMasterClient) GetLookupFileIdFunction() wdclient.LookupFileIdFunctionType { + return func(ctx context.Context, fileId string) ([]string, error) { + if urls, ok := m.urls[fileId]; ok { + return urls, nil + } + return nil, fmt.Errorf("fileId %s not found", fileId) + } +} + +func (m *testMasterClient) InvalidateCache(fileId string) { + atomic.AddInt32(&m.invalidatedCount, 1) +} + +func noopJwt(fileId string) string { return "" } + +// createTestServer creates a mock volume server that serves chunk data. +// Supports Range header for partial chunk reads (exercising OffsetInChunk paths). +func createTestServer(chunkData map[string][]byte) *httptest.Server { + var mu sync.RWMutex + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + path := r.URL.Path + if strings.HasPrefix(path, "/") { + path = path[1:] + } + mu.RLock() + data, ok := chunkData[path] + mu.RUnlock() + if !ok { + http.Error(w, "not found", http.StatusNotFound) + return + } + + // Handle Range header for partial chunk reads + rangeHeader := r.Header.Get("Range") + if rangeHeader != "" { + var start, end int64 + if _, err := fmt.Sscanf(rangeHeader, "bytes=%d-%d", &start, &end); err == nil { + if start < 0 || end >= int64(len(data)) || start > end { + w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", len(data))) + w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) + return + } + rangeData := data[start : end+1] + w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, len(data))) + w.Header().Set("Content-Length", fmt.Sprintf("%d", len(rangeData))) + w.WriteHeader(http.StatusPartialContent) + w.Write(rangeData) + return + } + } + + w.Header().Set("Content-Length", fmt.Sprintf("%d", len(data))) + w.WriteHeader(http.StatusOK) + w.Write(data) + })) +} + +// makeChunksAndServer creates N chunks of given size, a mock server, and a master client +func makeChunksAndServer(t *testing.T, numChunks, chunkSize int) ([]*filer_pb.FileChunk, *testMasterClient, map[string][]byte, func()) { + t.Helper() + + chunkData := make(map[string][]byte, numChunks) + chunks := make([]*filer_pb.FileChunk, numChunks) + + for i := 0; i < numChunks; i++ { + fileId := fmt.Sprintf("1,%x", i) + data := make([]byte, chunkSize) + rand.Read(data) + chunkData[fileId] = data + + chunks[i] = &filer_pb.FileChunk{ + FileId: fileId, + Offset: int64(i * chunkSize), + Size: uint64(chunkSize), + ModifiedTsNs: int64(i), + Fid: &filer_pb.FileId{FileKey: uint64(i)}, + } + } + + server := createTestServer(chunkData) + urls := make(map[string][]string, numChunks) + for i := 0; i < numChunks; i++ { + fileId := fmt.Sprintf("1,%x", i) + urls[fileId] = []string{server.URL + "/" + fileId} + } + + masterClient := &testMasterClient{urls: urls} + return chunks, masterClient, chunkData, func() { server.Close() } +} + +// TestPrefetchInOrderDelivery verifies chunks are written to the output in correct file order +func TestPrefetchInOrderDelivery(t *testing.T) { + chunks, masterClient, chunkData, cleanup := makeChunksAndServer(t, 8, 4096) + defer cleanup() + + totalSize := int64(8 * 4096) + + streamFn, err := PrepareStreamContentWithPrefetch( + context.Background(), masterClient, noopJwt, + chunks, 0, totalSize, 0, 4, + ) + if err != nil { + t.Fatal(err) + } + + var buf bytes.Buffer + if err := streamFn(&buf); err != nil { + t.Fatal(err) + } + + // Verify total size + if buf.Len() != int(totalSize) { + t.Fatalf("expected %d bytes, got %d", totalSize, buf.Len()) + } + + // Verify data matches chunk-by-chunk in order + result := buf.Bytes() + for i := 0; i < 8; i++ { + fileId := fmt.Sprintf("1,%x", i) + expected := chunkData[fileId] + got := result[i*4096 : (i+1)*4096] + if !bytes.Equal(expected, got) { + t.Fatalf("chunk %d (%s) data mismatch at offset %d", i, fileId, i*4096) + } + } +} + +// TestPrefetchSingleChunk verifies the pipeline works with just one chunk +func TestPrefetchSingleChunk(t *testing.T) { + chunks, masterClient, chunkData, cleanup := makeChunksAndServer(t, 1, 8192) + defer cleanup() + + streamFn, err := PrepareStreamContentWithPrefetch( + context.Background(), masterClient, noopJwt, + chunks, 0, 8192, 0, 4, + ) + if err != nil { + t.Fatal(err) + } + + var buf bytes.Buffer + if err := streamFn(&buf); err != nil { + t.Fatal(err) + } + + expected := chunkData["1,0"] + if !bytes.Equal(expected, buf.Bytes()) { + t.Fatal("single chunk data mismatch") + } +} + +// TestPrefetchFallbackToSequential verifies prefetch=1 falls back to sequential path +func TestPrefetchFallbackToSequential(t *testing.T) { + chunks, masterClient, chunkData, cleanup := makeChunksAndServer(t, 4, 1024) + defer cleanup() + + totalSize := int64(4 * 1024) + + streamFn, err := PrepareStreamContentWithPrefetch( + context.Background(), masterClient, noopJwt, + chunks, 0, totalSize, 0, 1, // prefetch=1 -> sequential + ) + if err != nil { + t.Fatal(err) + } + + var buf bytes.Buffer + if err := streamFn(&buf); err != nil { + t.Fatal(err) + } + + if buf.Len() != int(totalSize) { + t.Fatalf("expected %d bytes, got %d", totalSize, buf.Len()) + } + + // Verify data order + result := buf.Bytes() + for i := 0; i < 4; i++ { + fileId := fmt.Sprintf("1,%x", i) + expected := chunkData[fileId] + got := result[i*1024 : (i+1)*1024] + if !bytes.Equal(expected, got) { + t.Fatalf("chunk %d data mismatch", i) + } + } +} + +// TestPrefetchContextCancellation verifies all goroutines clean up on cancellation +func TestPrefetchContextCancellation(t *testing.T) { + // Use a slow server so cancellation happens mid-stream + var requestCount int32 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&requestCount, 1) + // Slow response + time.Sleep(100 * time.Millisecond) + w.Header().Set("Content-Length", "1024") + w.WriteHeader(http.StatusOK) + w.Write(make([]byte, 1024)) + })) + defer server.Close() + + numChunks := 16 + chunks := make([]*filer_pb.FileChunk, numChunks) + urls := make(map[string][]string, numChunks) + for i := 0; i < numChunks; i++ { + fileId := fmt.Sprintf("1,%x", i) + chunks[i] = &filer_pb.FileChunk{ + FileId: fileId, Offset: int64(i * 1024), Size: 1024, + ModifiedTsNs: int64(i), Fid: &filer_pb.FileId{FileKey: uint64(i)}, + } + urls[fileId] = []string{server.URL + "/" + fileId} + } + masterClient := &testMasterClient{urls: urls} + + // Cancel after a short time + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + + streamFn, err := PrepareStreamContentWithPrefetch( + ctx, masterClient, noopJwt, + chunks, 0, int64(numChunks*1024), 0, 4, + ) + if err != nil { + // URL resolution may fail due to cancellation — that's expected + return + } + + err = streamFn(io.Discard) + if err == nil { + t.Fatal("expected error from cancelled context") + } + + // Verify not all chunks were requested (cancellation stopped early) + reqs := atomic.LoadInt32(&requestCount) + if reqs >= int32(numChunks) { + t.Logf("warning: all %d chunks were requested despite cancellation (got %d)", numChunks, reqs) + } +} + +// TestPrefetchRangeRequest verifies prefetch works with offset/size subset +func TestPrefetchRangeRequest(t *testing.T) { + chunks, masterClient, chunkData, cleanup := makeChunksAndServer(t, 8, 4096) + defer cleanup() + + // Request only chunks 2-5 (offset=8192, size=16384) + offset := int64(2 * 4096) + size := int64(4 * 4096) + + streamFn, err := PrepareStreamContentWithPrefetch( + context.Background(), masterClient, noopJwt, + chunks, offset, size, 0, 4, + ) + if err != nil { + t.Fatal(err) + } + + var buf bytes.Buffer + if err := streamFn(&buf); err != nil { + t.Fatal(err) + } + + if buf.Len() != int(size) { + t.Fatalf("expected %d bytes, got %d", size, buf.Len()) + } + + // Verify data matches chunks 2-5 + result := buf.Bytes() + for i := 2; i < 6; i++ { + fileId := fmt.Sprintf("1,%x", i) + expected := chunkData[fileId] + start := (i - 2) * 4096 + got := result[start : start+4096] + if !bytes.Equal(expected, got) { + t.Fatalf("chunk %d data mismatch in range request", i) + } + } +} + +// TestPrefetchLargePrefetchCount verifies prefetch > numChunks is handled gracefully +func TestPrefetchLargePrefetchCount(t *testing.T) { + chunks, masterClient, _, cleanup := makeChunksAndServer(t, 3, 1024) + defer cleanup() + + totalSize := int64(3 * 1024) + + // prefetch=10 but only 3 chunks — should work fine + streamFn, err := PrepareStreamContentWithPrefetch( + context.Background(), masterClient, noopJwt, + chunks, 0, totalSize, 0, 10, + ) + if err != nil { + t.Fatal(err) + } + + var buf bytes.Buffer + if err := streamFn(&buf); err != nil { + t.Fatal(err) + } + + if buf.Len() != int(totalSize) { + t.Fatalf("expected %d bytes, got %d", totalSize, buf.Len()) + } +} + +// TestPrefetchConcurrentDownloads verifies multiple concurrent prefetch streams +func TestPrefetchConcurrentDownloads(t *testing.T) { + chunks, masterClient, _, cleanup := makeChunksAndServer(t, 8, 2048) + defer cleanup() + + totalSize := int64(8 * 2048) + + var wg sync.WaitGroup + errors := make(chan error, 4) + + for i := 0; i < 4; i++ { + wg.Add(1) + go func() { + defer wg.Done() + streamFn, err := PrepareStreamContentWithPrefetch( + context.Background(), masterClient, noopJwt, + chunks, 0, totalSize, 0, 4, + ) + if err != nil { + errors <- err + return + } + if err := streamFn(io.Discard); err != nil { + errors <- err + } + }() + } + + wg.Wait() + close(errors) + + for err := range errors { + t.Fatalf("concurrent download error: %v", err) + } +} diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 7522ccd2d..e0f05f16b 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -1053,7 +1053,7 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R // Prepare streaming function with simple master client wrapper tStreamPrep := time.Now() // Use filerClient directly (not wrapped) so it can support cache invalidation - streamFn, err := filer.PrepareStreamContentWithThrottler( + streamFn, err := filer.PrepareStreamContentWithPrefetch( ctx, s3a.filerClient, filer.JwtForVolumeServer, // Use filer's JWT function (loads config once, generates JWT locally) @@ -1061,6 +1061,7 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R offset, size, 0, // no throttling + 4, // prefetch 4 chunks ahead for overlapped fetching ) streamPrepTime = time.Since(tStreamPrep) if err != nil { @@ -1928,7 +1929,7 @@ func (s3a *S3ApiServer) getEncryptedStreamFromVolumes(ctx context.Context, entry } // Create streaming reader - use filerClient directly for cache invalidation support - streamFn, err := filer.PrepareStreamContentWithThrottler( + streamFn, err := filer.PrepareStreamContentWithPrefetch( ctx, s3a.filerClient, filer.JwtForVolumeServer, // Use filer's JWT function (loads config once, generates JWT locally) @@ -1936,6 +1937,7 @@ func (s3a *S3ApiServer) getEncryptedStreamFromVolumes(ctx context.Context, entry 0, totalSize, 0, + 4, // prefetch 4 chunks ahead for overlapped fetching ) if err != nil { return nil, err diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index 3db936f43..61a87cd97 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -223,7 +223,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) // Matches S3 API behavior. Request context (ctx) is used for metadata operations above. streamCtx, streamCancel := context.WithCancel(context.WithoutCancel(ctx)) - streamFn, err := filer.PrepareStreamContentWithThrottler(streamCtx, fs.filer.MasterClient, fs.maybeGetVolumeReadJwtAuthorizationToken, chunks, offset, size, fs.option.DownloadMaxBytesPs) + streamFn, err := filer.PrepareStreamContentWithPrefetch(streamCtx, fs.filer.MasterClient, fs.maybeGetVolumeReadJwtAuthorizationToken, chunks, offset, size, fs.option.DownloadMaxBytesPs, 4) if err != nil { streamCancel() stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadStream).Inc() diff --git a/weed/util/http/http_global_client_util.go b/weed/util/http/http_global_client_util.go index 62e723d38..f8d20ae67 100644 --- a/weed/util/http/http_global_client_util.go +++ b/weed/util/http/http_global_client_util.go @@ -372,7 +372,7 @@ func ReadUrlAsStream(ctx context.Context, fileUrl, jwt string, cipherKey []byte, var ( m int ) - buf := mem.Allocate(64 * 1024) + buf := mem.Allocate(256 * 1024) defer mem.Free(buf) for {