diff --git a/weed/filer/stream.go b/weed/filer/stream.go index e49794fd2..d0c028e88 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -245,6 +245,66 @@ func PrepareStreamContentWithThrottler(ctx context.Context, masterClient wdclien }, nil } +// PrepareStreamContentWithPrefetch is like PrepareStreamContentWithThrottler but uses +// concurrent chunk prefetching to overlap network I/O. When prefetchAhead > 1, fetch +// goroutines establish HTTP connections to volume servers ahead of time, streaming data +// through io.Pipe with minimal memory overhead. +// +// prefetchAhead controls the number of chunks fetched concurrently: +// - 0 or 1: falls back to sequential fetching (same as PrepareStreamContentWithThrottler) +// - 2+: uses pipe-based prefetch pipeline with that many concurrent fetches +func PrepareStreamContentWithPrefetch(ctx context.Context, masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64, prefetchAhead int) (DoStreamContent, error) { + if prefetchAhead <= 1 { + return PrepareStreamContentWithThrottler(ctx, masterClient, jwtFunc, chunks, offset, size, downloadMaxBytesPs) + } + + glog.V(4).InfofCtx(ctx, "prepare to stream content with prefetch=%d for chunks: %d", prefetchAhead, len(chunks)) + chunkViews := ViewFromChunks(ctx, masterClient.GetLookupFileIdFunction(), chunks, offset, size) + + fileId2Url := make(map[string][]string) + + for x := chunkViews.Front(); x != nil; x = x.Next { + chunkView := x.Value + var urlStrings []string + var err error + for _, backoff := range getLookupFileIdBackoffSchedule { + if err := ctx.Err(); err != nil { + return nil, err + } + urlStrings, err = masterClient.GetLookupFileIdFunction()(ctx, chunkView.FileId) + if err == nil && len(urlStrings) > 0 { + break + } + if err := ctx.Err(); err != nil { + return nil, err + } + glog.V(4).InfofCtx(ctx, "waiting for chunk: %s", chunkView.FileId) + timer := time.NewTimer(backoff) + select { + case <-ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return nil, ctx.Err() + case <-timer.C: + } + } + if err != nil { + glog.V(1).InfofCtx(ctx, "operation LookupFileId %s failed, err: %v", chunkView.FileId, err) + return nil, err + } else if len(urlStrings) == 0 { + errUrlNotFound := fmt.Errorf("operation LookupFileId %s failed, err: urls not found", chunkView.FileId) + glog.ErrorCtx(ctx, errUrlNotFound) + return nil, errUrlNotFound + } + fileId2Url[chunkView.FileId] = urlStrings + } + + return func(writer io.Writer) error { + return streamChunksPrefetched(ctx, writer, chunkViews, fileId2Url, jwtFunc, masterClient, offset, size, downloadMaxBytesPs, prefetchAhead) + }, nil +} + func StreamContent(masterClient wdclient.HasLookupFileIdFunction, writer io.Writer, chunks []*filer_pb.FileChunk, offset int64, size int64) error { streamFn, err := PrepareStreamContent(masterClient, JwtForVolumeServer, chunks, offset, size) if err != nil { diff --git a/weed/filer/stream_benchmark_test.go b/weed/filer/stream_benchmark_test.go new file mode 100644 index 000000000..f0b76e0e7 --- /dev/null +++ b/weed/filer/stream_benchmark_test.go @@ -0,0 +1,317 @@ +package filer + +import ( + "bytes" + "context" + "fmt" + "io" + "math/rand" + "net/http" + "net/http/httptest" + "os" + "strings" + "sync" + "testing" + "time" + + util_http "github.com/seaweedfs/seaweedfs/weed/util/http" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/wdclient" +) + +func TestMain(m *testing.M) { + // Initialize the global HTTP client required by ReadUrlAsStream + util_http.InitGlobalHttpClient() + os.Exit(m.Run()) +} + +// mockMasterClientForBenchmark implements HasLookupFileIdFunction and CacheInvalidator +type mockMasterClientForBenchmark struct { + urls map[string][]string +} + +func (m *mockMasterClientForBenchmark) GetLookupFileIdFunction() wdclient.LookupFileIdFunctionType { + return func(ctx context.Context, fileId string) ([]string, error) { + if urls, ok := m.urls[fileId]; ok { + return urls, nil + } + return nil, fmt.Errorf("fileId %s not found", fileId) + } +} + +func (m *mockMasterClientForBenchmark) InvalidateCache(fileId string) {} + +// noopJwtFunc returns empty JWT for testing +func noopJwtFunc(fileId string) string { + return "" +} + +// createMockVolumeServer creates an httptest server that serves chunk data +// with configurable per-request latency to simulate network conditions. +// The latency is applied once per request (simulating RTT), not per byte. +func createMockVolumeServer(chunkData map[string][]byte, latency time.Duration) *httptest.Server { + var mu sync.RWMutex + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Simulate network latency (connection setup + RTT) + if latency > 0 { + time.Sleep(latency) + } + + // Extract fileId from path (e.g., "/1,abc123") + path := r.URL.Path + if strings.HasPrefix(path, "/") { + path = path[1:] + } + + mu.RLock() + data, ok := chunkData[path] + mu.RUnlock() + if !ok { + http.Error(w, "not found", http.StatusNotFound) + return + } + + // Handle Range header + rangeHeader := r.Header.Get("Range") + if rangeHeader != "" { + var start, end int64 + fmt.Sscanf(rangeHeader, "bytes=%d-%d", &start, &end) + if start >= 0 && end < int64(len(data)) && start <= end { + w.Header().Set("Content-Length", fmt.Sprintf("%d", end-start+1)) + w.WriteHeader(http.StatusPartialContent) + w.Write(data[start : end+1]) + return + } + } + + w.Header().Set("Content-Length", fmt.Sprintf("%d", len(data))) + w.WriteHeader(http.StatusOK) + w.Write(data) + })) +} + +// benchmarkConfig holds parameters for a single benchmark scenario +type benchmarkConfig struct { + numChunks int + chunkSize int + latency time.Duration + prefetch int // 0 = sequential +} + +func (c benchmarkConfig) name() string { + name := fmt.Sprintf("chunks=%d/size=%dKB/latency=%dms", + c.numChunks, c.chunkSize/1024, c.latency.Milliseconds()) + if c.prefetch > 0 { + name += fmt.Sprintf("/prefetch=%d", c.prefetch) + } + return name +} + +// setupBenchmark creates mock infrastructure and returns chunks, master client, and cleanup func +func setupBenchmark(b *testing.B, cfg benchmarkConfig) ([]*filer_pb.FileChunk, *mockMasterClientForBenchmark, func()) { + b.Helper() + + // Generate random chunk data + chunkData := make(map[string][]byte, cfg.numChunks) + chunks := make([]*filer_pb.FileChunk, cfg.numChunks) + + for i := 0; i < cfg.numChunks; i++ { + fileId := fmt.Sprintf("1,%x", i) + data := make([]byte, cfg.chunkSize) + rand.Read(data) + chunkData[fileId] = data + + chunks[i] = &filer_pb.FileChunk{ + FileId: fileId, + Offset: int64(i * cfg.chunkSize), + Size: uint64(cfg.chunkSize), + ModifiedTsNs: int64(i), + Fid: &filer_pb.FileId{FileKey: uint64(i)}, + } + } + + // Start mock volume server + server := createMockVolumeServer(chunkData, cfg.latency) + + // Build URL map + urls := make(map[string][]string, cfg.numChunks) + for i := 0; i < cfg.numChunks; i++ { + fileId := fmt.Sprintf("1,%x", i) + urls[fileId] = []string{server.URL + "/" + fileId} + } + + masterClient := &mockMasterClientForBenchmark{urls: urls} + cleanup := func() { server.Close() } + + return chunks, masterClient, cleanup +} + +// runSequentialBenchmark runs the current sequential streaming path +func runSequentialBenchmark(b *testing.B, cfg benchmarkConfig) { + chunks, masterClient, cleanup := setupBenchmark(b, cfg) + defer cleanup() + + totalSize := int64(cfg.numChunks * cfg.chunkSize) + + b.ResetTimer() + b.SetBytes(totalSize) + + for i := 0; i < b.N; i++ { + streamFn, err := PrepareStreamContentWithThrottler( + context.Background(), + masterClient, + noopJwtFunc, + chunks, + 0, + totalSize, + 0, // no throttle + ) + if err != nil { + b.Fatal(err) + } + if err := streamFn(io.Discard); err != nil { + b.Fatal(err) + } + } +} + +// BenchmarkStreamSequential benchmarks the current sequential streaming path. +// This provides the BEFORE baseline for comparison. +func BenchmarkStreamSequential(b *testing.B) { + configs := []benchmarkConfig{ + // Pure throughput (no latency) + {numChunks: 16, chunkSize: 64 * 1024, latency: 0}, + {numChunks: 64, chunkSize: 64 * 1024, latency: 0}, + // Moderate latency — shows RTT gap overhead + {numChunks: 16, chunkSize: 64 * 1024, latency: 5 * time.Millisecond}, + {numChunks: 64, chunkSize: 64 * 1024, latency: 5 * time.Millisecond}, + // High latency — significant RTT overhead + {numChunks: 16, chunkSize: 64 * 1024, latency: 20 * time.Millisecond}, + {numChunks: 64, chunkSize: 64 * 1024, latency: 10 * time.Millisecond}, + } + + for _, cfg := range configs { + b.Run(cfg.name(), func(b *testing.B) { + runSequentialBenchmark(b, cfg) + }) + } +} + +// BenchmarkStreamSequentialVerify is a quick functional test that the benchmark +// infrastructure works correctly — ensures data integrity through the pipeline. +func BenchmarkStreamSequentialVerify(b *testing.B) { + cfg := benchmarkConfig{numChunks: 4, chunkSize: 1024, latency: 0} + chunks, masterClient, cleanup := setupBenchmark(b, cfg) + defer cleanup() + + totalSize := int64(cfg.numChunks * cfg.chunkSize) + + streamFn, err := PrepareStreamContentWithThrottler( + context.Background(), + masterClient, + noopJwtFunc, + chunks, + 0, + totalSize, + 0, + ) + if err != nil { + b.Fatal(err) + } + + var buf bytes.Buffer + if err := streamFn(&buf); err != nil { + b.Fatal(err) + } + + if buf.Len() != int(totalSize) { + b.Fatalf("expected %d bytes, got %d", totalSize, buf.Len()) + } +} + +// runPrefetchBenchmark runs the new prefetch streaming path +func runPrefetchBenchmark(b *testing.B, cfg benchmarkConfig) { + chunks, masterClient, cleanup := setupBenchmark(b, cfg) + defer cleanup() + + totalSize := int64(cfg.numChunks * cfg.chunkSize) + + b.ResetTimer() + b.SetBytes(totalSize) + + for i := 0; i < b.N; i++ { + streamFn, err := PrepareStreamContentWithPrefetch( + context.Background(), + masterClient, + noopJwtFunc, + chunks, + 0, + totalSize, + 0, // no throttle + cfg.prefetch, + ) + if err != nil { + b.Fatal(err) + } + if err := streamFn(io.Discard); err != nil { + b.Fatal(err) + } + } +} + +// BenchmarkStreamPrefetch benchmarks the new prefetch streaming path. +// Compare against BenchmarkStreamSequential for the AFTER measurement. +func BenchmarkStreamPrefetch(b *testing.B) { + configs := []benchmarkConfig{ + // Pure throughput (no latency) — should be similar to sequential + {numChunks: 16, chunkSize: 64 * 1024, latency: 0, prefetch: 4}, + {numChunks: 64, chunkSize: 64 * 1024, latency: 0, prefetch: 4}, + // Moderate latency — prefetch should eliminate most RTT overhead + {numChunks: 16, chunkSize: 64 * 1024, latency: 5 * time.Millisecond, prefetch: 4}, + {numChunks: 64, chunkSize: 64 * 1024, latency: 5 * time.Millisecond, prefetch: 4}, + // High latency — most benefit from prefetch + {numChunks: 16, chunkSize: 64 * 1024, latency: 20 * time.Millisecond, prefetch: 4}, + {numChunks: 64, chunkSize: 64 * 1024, latency: 10 * time.Millisecond, prefetch: 4}, + // Vary prefetch count with moderate latency + {numChunks: 64, chunkSize: 64 * 1024, latency: 5 * time.Millisecond, prefetch: 2}, + {numChunks: 64, chunkSize: 64 * 1024, latency: 5 * time.Millisecond, prefetch: 8}, + } + + for _, cfg := range configs { + b.Run(cfg.name(), func(b *testing.B) { + runPrefetchBenchmark(b, cfg) + }) + } +} + +// BenchmarkStreamPrefetchVerify verifies data integrity through the prefetch pipeline. +func BenchmarkStreamPrefetchVerify(b *testing.B) { + cfg := benchmarkConfig{numChunks: 4, chunkSize: 1024, latency: 0, prefetch: 4} + chunks, masterClient, cleanup := setupBenchmark(b, cfg) + defer cleanup() + + totalSize := int64(cfg.numChunks * cfg.chunkSize) + + streamFn, err := PrepareStreamContentWithPrefetch( + context.Background(), + masterClient, + noopJwtFunc, + chunks, + 0, + totalSize, + 0, + cfg.prefetch, + ) + if err != nil { + b.Fatal(err) + } + + var buf bytes.Buffer + if err := streamFn(&buf); err != nil { + b.Fatal(err) + } + + if buf.Len() != int(totalSize) { + b.Fatalf("expected %d bytes, got %d", totalSize, buf.Len()) + } +} diff --git a/weed/filer/stream_prefetch.go b/weed/filer/stream_prefetch.go new file mode 100644 index 000000000..511f2b90c --- /dev/null +++ b/weed/filer/stream_prefetch.go @@ -0,0 +1,274 @@ +package filer + +import ( + "context" + "fmt" + "io" + "sync" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/stats" + "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/util/mem" + "github.com/seaweedfs/seaweedfs/weed/wdclient" +) + +// chunkPipeResult represents a prefetched chunk streaming through a pipe. +// The fetch goroutine writes data into the pipeWriter; the consumer reads from pipeReader. +type chunkPipeResult struct { + chunkView *ChunkView + reader *io.PipeReader + fetchErr error // final error from fetch goroutine + written int64 // bytes written by fetch goroutine + done chan struct{} // closed when fetch goroutine finishes + urlStrings []string // snapshot of URLs at dispatch time (for retry logic) +} + +// streamChunksPrefetched streams chunks with concurrent prefetch using io.Pipe. +// +// For each chunk in file order, a goroutine is launched (bounded by a semaphore) +// that establishes an HTTP connection to the volume server and streams data through +// an io.Pipe. The consumer reads from pipes in order, writing to the response. +// +// Memory usage is minimal: pipes are synchronous (no buffering), and only one +// reusable copy buffer is allocated for the consumer. +func streamChunksPrefetched( + ctx context.Context, + writer io.Writer, + chunkViews *IntervalList[*ChunkView], + fileId2Url map[string][]string, + jwtFunc VolumeServerJwtFunction, + masterClient wdclient.HasLookupFileIdFunction, + offset int64, + size int64, + downloadMaxBytesPs int64, + prefetchAhead int, +) error { + downloadThrottler := util.NewWriteThrottler(downloadMaxBytesPs) + + // Create a local cancellable context so the consumer can stop the producer + // and all in-flight fetch goroutines on error (e.g., client disconnect). + localCtx, localCancel := context.WithCancel(ctx) + defer localCancel() + + // Ordered channel: one entry per chunk, in file order. + // Capacity = prefetchAhead so the producer can run ahead. + // Uses pointer to avoid copying the struct while fetch goroutines write to it. + results := make(chan *chunkPipeResult, prefetchAhead) + + // Semaphore to limit concurrent fetch goroutines (and thus HTTP connections). + sem := make(chan struct{}, prefetchAhead) + + // Producer: walks chunk list, launches fetch goroutines, sends results in order. + // The producer only reads from fileId2Url (populated before streaming starts), + // so there is no concurrent map access — the consumer never writes to it. + var producerWg sync.WaitGroup + producerWg.Add(1) + go func() { + defer producerWg.Done() + defer close(results) + + for x := chunkViews.Front(); x != nil; x = x.Next { + chunkView := x.Value + + // Check context before starting new fetch + select { + case <-localCtx.Done(): + return + default: + } + + // Acquire semaphore slot (bounds concurrent HTTP connections) + select { + case sem <- struct{}{}: + case <-localCtx.Done(): + return + } + + pr, pw := io.Pipe() + urlStrings := fileId2Url[chunkView.FileId] + jwt := jwtFunc(chunkView.FileId) + + result := &chunkPipeResult{ + chunkView: chunkView, + reader: pr, + done: make(chan struct{}), + urlStrings: urlStrings, + } + + // Launch fetch goroutine + go func(cv *ChunkView, urls []string, jwt string, pw *io.PipeWriter, res *chunkPipeResult) { + defer func() { <-sem }() // release semaphore + defer close(res.done) + + written, err := retriedStreamFetchChunkData( + localCtx, pw, urls, jwt, + cv.CipherKey, cv.IsGzipped, cv.IsFullChunk(), + cv.OffsetInChunk, int(cv.ViewSize), + ) + res.written = written + res.fetchErr = err + + if err != nil { + pw.CloseWithError(err) + } else { + pw.Close() + } + }(chunkView, urlStrings, jwt, pw, result) + + // Send result to consumer (blocks if channel full, back-pressuring producer) + select { + case results <- result: + case <-localCtx.Done(): + // Consumer gone; close the pipe and wait for the fetch goroutine + // to finish so we don't leak it (this result was never sent to + // the channel, so the drain loop won't handle it). + pr.Close() + <-result.done + return + } + } + }() + + // Consumer: reads from results channel in order, writes to response writer. + // Use the SeaweedFS memory pool for the copy buffer to reduce GC pressure. + copyBuf := mem.Allocate(256 * 1024) + defer mem.Free(copyBuf) + remaining := size + + var consumeErr error + for result := range results { + chunkView := result.chunkView + + // Handle gap before this chunk (zero-fill) + if offset < chunkView.ViewOffset { + gap := chunkView.ViewOffset - offset + remaining -= gap + glog.V(4).InfofCtx(ctx, "prefetch zero [%d,%d)", offset, chunkView.ViewOffset) + if err := writeZero(writer, gap); err != nil { + consumeErr = fmt.Errorf("write zero [%d,%d): %w", offset, chunkView.ViewOffset, err) + result.reader.Close() + break + } + offset = chunkView.ViewOffset + } + + // Stream chunk data from pipe to response + start := time.Now() + _, copyErr := io.CopyBuffer(writer, result.reader, copyBuf) + result.reader.Close() + + // Wait for fetch goroutine to finish to get final error + <-result.done + + // Determine the effective error + err := copyErr + if err == nil && result.fetchErr != nil && result.written == 0 { + err = result.fetchErr + } + + // If the fetcher itself failed before writing any data, try cache invalidation + // + re-fetch (same as sequential path stream.go:197). We check result.fetchErr + // and result.written (not copied) to avoid wrongly retrying when the fetch + // succeeded but the response writer failed on the first write. + if result.fetchErr != nil && result.written == 0 { + if err := localCtx.Err(); err != nil { + consumeErr = err + break + } + retryErr := retryWithCacheInvalidation(localCtx, writer, chunkView, result.urlStrings, jwtFunc, masterClient) + if retryErr != nil { + stats.FilerHandlerCounter.WithLabelValues("chunkDownloadError").Inc() + consumeErr = fmt.Errorf("read chunk: %w", retryErr) + break + } + // Retry succeeded + err = nil + } else if err != nil { + if localCtx.Err() != nil { + consumeErr = localCtx.Err() + } else { + stats.FilerHandlerCounter.WithLabelValues("chunkDownloadError").Inc() + consumeErr = fmt.Errorf("read chunk: %w", err) + } + break + } + + offset += int64(chunkView.ViewSize) + remaining -= int64(chunkView.ViewSize) + stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds()) + stats.FilerHandlerCounter.WithLabelValues("chunkDownload").Inc() + downloadThrottler.MaybeSlowdown(int64(chunkView.ViewSize)) + } + + // Cancel the local context to stop the producer and any in-flight fetchers early. + // This ensures goroutines don't linger after the consumer exits (e.g., on write error). + localCancel() + + // Drain remaining results to close pipes and unblock fetch goroutines + for result := range results { + result.reader.Close() + <-result.done + } + + // Wait for producer to finish + producerWg.Wait() + + if consumeErr != nil { + return consumeErr + } + + // Handle trailing zero-fill + if remaining > 0 { + glog.V(4).InfofCtx(ctx, "prefetch zero [%d,%d)", offset, offset+remaining) + if err := writeZero(writer, remaining); err != nil { + return fmt.Errorf("write zero [%d,%d): %w", offset, offset+remaining, err) + } + } + + return nil +} + +// retryWithCacheInvalidation attempts to re-fetch a chunk after invalidating the URL cache. +// This mirrors the retry logic in PrepareStreamContentWithThrottler's sequential path. +func retryWithCacheInvalidation( + ctx context.Context, + writer io.Writer, + chunkView *ChunkView, + oldUrlStrings []string, + jwtFunc VolumeServerJwtFunction, + masterClient wdclient.HasLookupFileIdFunction, +) error { + invalidator, ok := masterClient.(CacheInvalidator) + if !ok { + return fmt.Errorf("read chunk %s failed and no cache invalidator available", chunkView.FileId) + } + + glog.V(0).InfofCtx(ctx, "prefetch read chunk %s failed, invalidating cache and retrying", chunkView.FileId) + invalidator.InvalidateCache(chunkView.FileId) + + newUrlStrings, lookupErr := masterClient.GetLookupFileIdFunction()(ctx, chunkView.FileId) + if lookupErr != nil { + glog.WarningfCtx(ctx, "failed to re-lookup chunk %s after cache invalidation: %v", chunkView.FileId, lookupErr) + return fmt.Errorf("re-lookup chunk %s: %w", chunkView.FileId, lookupErr) + } + if len(newUrlStrings) == 0 { + glog.WarningfCtx(ctx, "re-lookup for chunk %s returned no locations, skipping retry", chunkView.FileId) + return fmt.Errorf("re-lookup chunk %s: no locations", chunkView.FileId) + } + + if urlSlicesEqual(oldUrlStrings, newUrlStrings) { + glog.V(0).InfofCtx(ctx, "re-lookup returned same locations for chunk %s, skipping retry", chunkView.FileId) + return fmt.Errorf("read chunk %s failed, same locations after cache invalidation", chunkView.FileId) + } + + glog.V(0).InfofCtx(ctx, "retrying read chunk %s with new locations: %v", chunkView.FileId, newUrlStrings) + jwt := jwtFunc(chunkView.FileId) + _, err := retriedStreamFetchChunkData( + ctx, writer, newUrlStrings, jwt, + chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), + chunkView.OffsetInChunk, int(chunkView.ViewSize), + ) + return err +} diff --git a/weed/filer/stream_prefetch_test.go b/weed/filer/stream_prefetch_test.go new file mode 100644 index 000000000..ee09178a1 --- /dev/null +++ b/weed/filer/stream_prefetch_test.go @@ -0,0 +1,365 @@ +package filer + +import ( + "bytes" + "context" + "fmt" + "io" + "math/rand" + "net/http" + "net/http/httptest" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/wdclient" +) + +// testMasterClient implements HasLookupFileIdFunction and CacheInvalidator for tests +type testMasterClient struct { + urls map[string][]string + invalidatedCount int32 +} + +func (m *testMasterClient) GetLookupFileIdFunction() wdclient.LookupFileIdFunctionType { + return func(ctx context.Context, fileId string) ([]string, error) { + if urls, ok := m.urls[fileId]; ok { + return urls, nil + } + return nil, fmt.Errorf("fileId %s not found", fileId) + } +} + +func (m *testMasterClient) InvalidateCache(fileId string) { + atomic.AddInt32(&m.invalidatedCount, 1) +} + +func noopJwt(fileId string) string { return "" } + +// createTestServer creates a mock volume server that serves chunk data. +// Supports Range header for partial chunk reads (exercising OffsetInChunk paths). +func createTestServer(chunkData map[string][]byte) *httptest.Server { + var mu sync.RWMutex + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + path := r.URL.Path + if strings.HasPrefix(path, "/") { + path = path[1:] + } + mu.RLock() + data, ok := chunkData[path] + mu.RUnlock() + if !ok { + http.Error(w, "not found", http.StatusNotFound) + return + } + + // Handle Range header for partial chunk reads + rangeHeader := r.Header.Get("Range") + if rangeHeader != "" { + var start, end int64 + if _, err := fmt.Sscanf(rangeHeader, "bytes=%d-%d", &start, &end); err == nil { + if start < 0 || end >= int64(len(data)) || start > end { + w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", len(data))) + w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) + return + } + rangeData := data[start : end+1] + w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, len(data))) + w.Header().Set("Content-Length", fmt.Sprintf("%d", len(rangeData))) + w.WriteHeader(http.StatusPartialContent) + w.Write(rangeData) + return + } + } + + w.Header().Set("Content-Length", fmt.Sprintf("%d", len(data))) + w.WriteHeader(http.StatusOK) + w.Write(data) + })) +} + +// makeChunksAndServer creates N chunks of given size, a mock server, and a master client +func makeChunksAndServer(t *testing.T, numChunks, chunkSize int) ([]*filer_pb.FileChunk, *testMasterClient, map[string][]byte, func()) { + t.Helper() + + chunkData := make(map[string][]byte, numChunks) + chunks := make([]*filer_pb.FileChunk, numChunks) + + for i := 0; i < numChunks; i++ { + fileId := fmt.Sprintf("1,%x", i) + data := make([]byte, chunkSize) + rand.Read(data) + chunkData[fileId] = data + + chunks[i] = &filer_pb.FileChunk{ + FileId: fileId, + Offset: int64(i * chunkSize), + Size: uint64(chunkSize), + ModifiedTsNs: int64(i), + Fid: &filer_pb.FileId{FileKey: uint64(i)}, + } + } + + server := createTestServer(chunkData) + urls := make(map[string][]string, numChunks) + for i := 0; i < numChunks; i++ { + fileId := fmt.Sprintf("1,%x", i) + urls[fileId] = []string{server.URL + "/" + fileId} + } + + masterClient := &testMasterClient{urls: urls} + return chunks, masterClient, chunkData, func() { server.Close() } +} + +// TestPrefetchInOrderDelivery verifies chunks are written to the output in correct file order +func TestPrefetchInOrderDelivery(t *testing.T) { + chunks, masterClient, chunkData, cleanup := makeChunksAndServer(t, 8, 4096) + defer cleanup() + + totalSize := int64(8 * 4096) + + streamFn, err := PrepareStreamContentWithPrefetch( + context.Background(), masterClient, noopJwt, + chunks, 0, totalSize, 0, 4, + ) + if err != nil { + t.Fatal(err) + } + + var buf bytes.Buffer + if err := streamFn(&buf); err != nil { + t.Fatal(err) + } + + // Verify total size + if buf.Len() != int(totalSize) { + t.Fatalf("expected %d bytes, got %d", totalSize, buf.Len()) + } + + // Verify data matches chunk-by-chunk in order + result := buf.Bytes() + for i := 0; i < 8; i++ { + fileId := fmt.Sprintf("1,%x", i) + expected := chunkData[fileId] + got := result[i*4096 : (i+1)*4096] + if !bytes.Equal(expected, got) { + t.Fatalf("chunk %d (%s) data mismatch at offset %d", i, fileId, i*4096) + } + } +} + +// TestPrefetchSingleChunk verifies the pipeline works with just one chunk +func TestPrefetchSingleChunk(t *testing.T) { + chunks, masterClient, chunkData, cleanup := makeChunksAndServer(t, 1, 8192) + defer cleanup() + + streamFn, err := PrepareStreamContentWithPrefetch( + context.Background(), masterClient, noopJwt, + chunks, 0, 8192, 0, 4, + ) + if err != nil { + t.Fatal(err) + } + + var buf bytes.Buffer + if err := streamFn(&buf); err != nil { + t.Fatal(err) + } + + expected := chunkData["1,0"] + if !bytes.Equal(expected, buf.Bytes()) { + t.Fatal("single chunk data mismatch") + } +} + +// TestPrefetchFallbackToSequential verifies prefetch=1 falls back to sequential path +func TestPrefetchFallbackToSequential(t *testing.T) { + chunks, masterClient, chunkData, cleanup := makeChunksAndServer(t, 4, 1024) + defer cleanup() + + totalSize := int64(4 * 1024) + + streamFn, err := PrepareStreamContentWithPrefetch( + context.Background(), masterClient, noopJwt, + chunks, 0, totalSize, 0, 1, // prefetch=1 -> sequential + ) + if err != nil { + t.Fatal(err) + } + + var buf bytes.Buffer + if err := streamFn(&buf); err != nil { + t.Fatal(err) + } + + if buf.Len() != int(totalSize) { + t.Fatalf("expected %d bytes, got %d", totalSize, buf.Len()) + } + + // Verify data order + result := buf.Bytes() + for i := 0; i < 4; i++ { + fileId := fmt.Sprintf("1,%x", i) + expected := chunkData[fileId] + got := result[i*1024 : (i+1)*1024] + if !bytes.Equal(expected, got) { + t.Fatalf("chunk %d data mismatch", i) + } + } +} + +// TestPrefetchContextCancellation verifies all goroutines clean up on cancellation +func TestPrefetchContextCancellation(t *testing.T) { + // Use a slow server so cancellation happens mid-stream + var requestCount int32 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + atomic.AddInt32(&requestCount, 1) + // Slow response + time.Sleep(100 * time.Millisecond) + w.Header().Set("Content-Length", "1024") + w.WriteHeader(http.StatusOK) + w.Write(make([]byte, 1024)) + })) + defer server.Close() + + numChunks := 16 + chunks := make([]*filer_pb.FileChunk, numChunks) + urls := make(map[string][]string, numChunks) + for i := 0; i < numChunks; i++ { + fileId := fmt.Sprintf("1,%x", i) + chunks[i] = &filer_pb.FileChunk{ + FileId: fileId, Offset: int64(i * 1024), Size: 1024, + ModifiedTsNs: int64(i), Fid: &filer_pb.FileId{FileKey: uint64(i)}, + } + urls[fileId] = []string{server.URL + "/" + fileId} + } + masterClient := &testMasterClient{urls: urls} + + // Cancel after a short time + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + + streamFn, err := PrepareStreamContentWithPrefetch( + ctx, masterClient, noopJwt, + chunks, 0, int64(numChunks*1024), 0, 4, + ) + if err != nil { + // URL resolution may fail due to cancellation — that's expected + return + } + + err = streamFn(io.Discard) + if err == nil { + t.Fatal("expected error from cancelled context") + } + + // Verify not all chunks were requested (cancellation stopped early) + reqs := atomic.LoadInt32(&requestCount) + if reqs >= int32(numChunks) { + t.Logf("warning: all %d chunks were requested despite cancellation (got %d)", numChunks, reqs) + } +} + +// TestPrefetchRangeRequest verifies prefetch works with offset/size subset +func TestPrefetchRangeRequest(t *testing.T) { + chunks, masterClient, chunkData, cleanup := makeChunksAndServer(t, 8, 4096) + defer cleanup() + + // Request only chunks 2-5 (offset=8192, size=16384) + offset := int64(2 * 4096) + size := int64(4 * 4096) + + streamFn, err := PrepareStreamContentWithPrefetch( + context.Background(), masterClient, noopJwt, + chunks, offset, size, 0, 4, + ) + if err != nil { + t.Fatal(err) + } + + var buf bytes.Buffer + if err := streamFn(&buf); err != nil { + t.Fatal(err) + } + + if buf.Len() != int(size) { + t.Fatalf("expected %d bytes, got %d", size, buf.Len()) + } + + // Verify data matches chunks 2-5 + result := buf.Bytes() + for i := 2; i < 6; i++ { + fileId := fmt.Sprintf("1,%x", i) + expected := chunkData[fileId] + start := (i - 2) * 4096 + got := result[start : start+4096] + if !bytes.Equal(expected, got) { + t.Fatalf("chunk %d data mismatch in range request", i) + } + } +} + +// TestPrefetchLargePrefetchCount verifies prefetch > numChunks is handled gracefully +func TestPrefetchLargePrefetchCount(t *testing.T) { + chunks, masterClient, _, cleanup := makeChunksAndServer(t, 3, 1024) + defer cleanup() + + totalSize := int64(3 * 1024) + + // prefetch=10 but only 3 chunks — should work fine + streamFn, err := PrepareStreamContentWithPrefetch( + context.Background(), masterClient, noopJwt, + chunks, 0, totalSize, 0, 10, + ) + if err != nil { + t.Fatal(err) + } + + var buf bytes.Buffer + if err := streamFn(&buf); err != nil { + t.Fatal(err) + } + + if buf.Len() != int(totalSize) { + t.Fatalf("expected %d bytes, got %d", totalSize, buf.Len()) + } +} + +// TestPrefetchConcurrentDownloads verifies multiple concurrent prefetch streams +func TestPrefetchConcurrentDownloads(t *testing.T) { + chunks, masterClient, _, cleanup := makeChunksAndServer(t, 8, 2048) + defer cleanup() + + totalSize := int64(8 * 2048) + + var wg sync.WaitGroup + errors := make(chan error, 4) + + for i := 0; i < 4; i++ { + wg.Add(1) + go func() { + defer wg.Done() + streamFn, err := PrepareStreamContentWithPrefetch( + context.Background(), masterClient, noopJwt, + chunks, 0, totalSize, 0, 4, + ) + if err != nil { + errors <- err + return + } + if err := streamFn(io.Discard); err != nil { + errors <- err + } + }() + } + + wg.Wait() + close(errors) + + for err := range errors { + t.Fatalf("concurrent download error: %v", err) + } +} diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 7522ccd2d..e0f05f16b 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -1053,7 +1053,7 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R // Prepare streaming function with simple master client wrapper tStreamPrep := time.Now() // Use filerClient directly (not wrapped) so it can support cache invalidation - streamFn, err := filer.PrepareStreamContentWithThrottler( + streamFn, err := filer.PrepareStreamContentWithPrefetch( ctx, s3a.filerClient, filer.JwtForVolumeServer, // Use filer's JWT function (loads config once, generates JWT locally) @@ -1061,6 +1061,7 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R offset, size, 0, // no throttling + 4, // prefetch 4 chunks ahead for overlapped fetching ) streamPrepTime = time.Since(tStreamPrep) if err != nil { @@ -1928,7 +1929,7 @@ func (s3a *S3ApiServer) getEncryptedStreamFromVolumes(ctx context.Context, entry } // Create streaming reader - use filerClient directly for cache invalidation support - streamFn, err := filer.PrepareStreamContentWithThrottler( + streamFn, err := filer.PrepareStreamContentWithPrefetch( ctx, s3a.filerClient, filer.JwtForVolumeServer, // Use filer's JWT function (loads config once, generates JWT locally) @@ -1936,6 +1937,7 @@ func (s3a *S3ApiServer) getEncryptedStreamFromVolumes(ctx context.Context, entry 0, totalSize, 0, + 4, // prefetch 4 chunks ahead for overlapped fetching ) if err != nil { return nil, err diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index 3db936f43..61a87cd97 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -223,7 +223,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) // Matches S3 API behavior. Request context (ctx) is used for metadata operations above. streamCtx, streamCancel := context.WithCancel(context.WithoutCancel(ctx)) - streamFn, err := filer.PrepareStreamContentWithThrottler(streamCtx, fs.filer.MasterClient, fs.maybeGetVolumeReadJwtAuthorizationToken, chunks, offset, size, fs.option.DownloadMaxBytesPs) + streamFn, err := filer.PrepareStreamContentWithPrefetch(streamCtx, fs.filer.MasterClient, fs.maybeGetVolumeReadJwtAuthorizationToken, chunks, offset, size, fs.option.DownloadMaxBytesPs, 4) if err != nil { streamCancel() stats.FilerHandlerCounter.WithLabelValues(stats.ErrorReadStream).Inc() diff --git a/weed/util/http/http_global_client_util.go b/weed/util/http/http_global_client_util.go index 62e723d38..f8d20ae67 100644 --- a/weed/util/http/http_global_client_util.go +++ b/weed/util/http/http_global_client_util.go @@ -372,7 +372,7 @@ func ReadUrlAsStream(ctx context.Context, fileUrl, jwt string, cipherKey []byte, var ( m int ) - buf := mem.Allocate(64 * 1024) + buf := mem.Allocate(256 * 1024) defer mem.Free(buf) for {