Files
seaweedFS/weed/server/filer_server_tus_handlers.go
Chris Lu 1b1e5f69a2 Add TUS protocol support for resumable uploads (#7592)
* Add TUS protocol integration tests

This commit adds integration tests for the TUS (resumable upload) protocol
in preparation for implementing TUS support in the filer.

Test coverage includes:
- OPTIONS handler for capability discovery
- Basic single-request upload
- Chunked/resumable uploads
- HEAD requests for offset tracking
- DELETE for upload cancellation
- Error handling (invalid offsets, missing uploads)
- Creation-with-upload extension
- Resume after interruption simulation

Tests are skipped in short mode and require a running SeaweedFS cluster.

* Add TUS session storage types and utilities

Implements TUS upload session management:
- TusSession struct for tracking upload state
- Session creation with directory-based storage
- Session persistence using filer entries
- Session retrieval and offset updates
- Session deletion with chunk cleanup
- Upload completion with chunk assembly into final file

Session data is stored in /.uploads.tus/{upload-id}/ directory,
following the pattern used by S3 multipart uploads.

* Add TUS HTTP handlers

Implements TUS protocol HTTP handlers:
- tusHandler: Main entry point routing requests
- tusOptionsHandler: Capability discovery (OPTIONS)
- tusCreateHandler: Create new upload (POST)
- tusHeadHandler: Get upload offset (HEAD)
- tusPatchHandler: Upload data at offset (PATCH)
- tusDeleteHandler: Cancel upload (DELETE)
- tusWriteData: Upload data to volume servers

Features:
- Supports creation-with-upload extension
- Validates TUS protocol headers
- Offset conflict detection
- Automatic upload completion when size is reached
- Metadata parsing from Upload-Metadata header

* Wire up TUS protocol routes in filer server

Add TUS handler route (/.tus/) to the filer HTTP server.
The TUS route is registered before the catch-all route to ensure
proper routing of TUS protocol requests.

TUS protocol is now accessible at:
- OPTIONS /.tus/ - Capability discovery
- POST /.tus/{path} - Create upload
- HEAD /.tus/.uploads/{id} - Get offset
- PATCH /.tus/.uploads/{id} - Upload data
- DELETE /.tus/.uploads/{id} - Cancel upload

* Improve TUS integration test setup

Add comprehensive Makefile for TUS tests with targets:
- test-with-server: Run tests with automatic server management
- test-basic/chunked/resume/errors: Specific test categories
- manual-start/stop: For development testing
- debug-logs/status: For debugging
- ci-test: For CI/CD pipelines

Update README.md with:
- Detailed TUS protocol documentation
- All endpoint descriptions with headers
- Usage examples with curl commands
- Architecture diagram
- Comparison with S3 multipart uploads

Follows the pattern established by other tests in test/ folder.

* Fix TUS integration tests and creation-with-upload

- Fix test URLs to use full URLs instead of relative paths
- Fix creation-with-upload to refresh session before completing
- Fix Makefile to properly handle test cleanup
- Add FullURL helper function to TestCluster

* Add TUS protocol tests to GitHub Actions CI

- Add tus-tests.yml workflow that runs on PRs and pushes
- Runs when TUS-related files are modified
- Automatic server management for integration testing
- Upload logs on failure for debugging

* Make TUS base path configurable via CLI

- Add -tus.path CLI flag to filer command
- TUS is disabled by default (empty path)
- Example: -tus.path=/.tus to enable at /.tus endpoint
- Update test Makefile to use -tus.path flag
- Update README with TUS enabling instructions

* Rename -tus.path to -tusBasePath with default .tus

- Rename CLI flag from -tus.path to -tusBasePath
- Default to .tus (TUS enabled by default)
- Add -filer.tusBasePath option to weed server command
- Properly handle path prefix (prepend / if missing)

* Address code review comments

- Sort chunks by offset before assembling final file
- Use chunk.Offset directly instead of recalculating
- Return error on invalid file ID instead of skipping
- Require Content-Length header for PATCH requests
- Use fs.option.Cipher for encryption setting
- Detect MIME type from data using http.DetectContentType
- Fix concurrency group for push events in workflow
- Use os.Interrupt instead of Kill for graceful shutdown in tests

* fmt

* Address remaining code review comments

- Fix potential open redirect vulnerability by sanitizing uploadLocation path
- Add language specifier to README code block
- Handle os.Create errors in test setup
- Use waitForHTTPServer instead of time.Sleep for master/volume readiness
- Improve test reliability and debugging

* Address critical and high-priority review comments

- Add per-session locking to prevent race conditions in updateTusSessionOffset
- Stream data directly to volume server instead of buffering entire chunk
- Only buffer 512 bytes for MIME type detection, then stream remaining data
- Clean up session locks when session is deleted

* Fix race condition to work across multiple filer instances

- Store each chunk as a separate file entry instead of updating session JSON
- Chunk file names encode offset, size, and fileId for atomic storage
- getTusSession loads chunks from directory listing (atomic read)
- Eliminates read-modify-write race condition across multiple filers
- Remove in-memory mutex that only worked for single filer instance

* Address code review comments: fix variable shadowing, sniff size, and test stability

- Rename path variable to reqPath to avoid shadowing path package
- Make sniff buffer size respect contentLength (read at most contentLength bytes)
- Handle Content-Length < 0 in creation-with-upload (return error for chunked encoding)
- Fix test cluster: use temp directory for filer store, add startup delay

* Fix test stability: increase cluster stabilization delay to 5 seconds

The tests were intermittently failing because the volume server needed more
time to create volumes and register with the master. Increasing the delay
from 2 to 5 seconds fixes the flaky test behavior.

* Address PR review comments for TUS protocol support

- Fix strconv.Atoi error handling in test file (lines 386, 747)
- Fix lossy fileId encoding: use base64 instead of underscore replacement
- Add pagination support for ListDirectoryEntries in getTusSession
- Batch delete chunks instead of one-by-one in deleteTusSession

* Address additional PR review comments for TUS protocol

- Fix UploadAt timestamp: use entry.Crtime instead of time.Now()
- Remove redundant JSON content in chunk entry (metadata in filename)
- Refactor tusWriteData to stream in 4MB chunks to avoid OOM on large uploads
- Pass filer.Entry to parseTusChunkPath to preserve actual upload time

* Address more PR review comments for TUS protocol

- Normalize TUS path once in filer_server.go, store in option.TusPath
- Remove redundant path normalization from TUS handlers
- Remove goto statement in tusCreateHandler, simplify control flow

* Remove unnecessary mutexes in tusWriteData

The upload loop is sequential, so uploadErrLock and chunksLock are not needed.

* Rename updateTusSessionOffset to saveTusChunk

Remove unused newOffset parameter and rename function to better reflect its purpose.

* Improve TUS upload performance and add path validation

- Reuse operation.Uploader across sub-chunks for better connection reuse
- Guard against TusPath='/' to prevent hijacking all filer routes

* Address PR review comments for TUS protocol

- Fix critical chunk filename parsing: use strings.Cut instead of SplitN
  to correctly handle base64-encoded fileIds that may contain underscores
- Rename tusPath to tusBasePath for naming consistency across codebase
- Add background garbage collection for expired TUS sessions (runs hourly)
- Improve error messages with %w wrapping for better debuggability

* Address additional TUS PR review comments

- Fix tusBasePath default to use leading slash (/.tus) for consistency
- Add chunk contiguity validation in completeTusUpload to detect gaps/overlaps
- Fix offset calculation to find maximum contiguous range from 0, not just last chunk
- Return 413 Request Entity Too Large instead of silently truncating content
- Document tusChunkSize rationale (4MB balances memory vs request overhead)
- Fix Makefile xargs portability by removing GNU-specific -r flag
- Add explicit -tusBasePath flag to integration test for robustness
- Fix README example to use /.uploads/tus path format

* Revert log_buffer changes (moved to separate PR)

* Minor style fixes from PR review

- Simplify tusBasePath flag description to use example format
- Add 'TUS upload' prefix to session not found error message
- Remove duplicate tusChunkSize comment
- Capitalize warning message for consistency
- Add grep filter to Makefile xargs for better empty input handling
2025-12-14 21:56:07 -08:00

462 lines
14 KiB
Go

package weed_server
import (
"bytes"
"context"
"encoding/base64"
"errors"
"fmt"
"io"
"net/http"
"path"
"strconv"
"strings"
"time"
"github.com/google/uuid"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/util"
)
// tusHandler is the main entry point for TUS protocol requests
func (fs *FilerServer) tusHandler(w http.ResponseWriter, r *http.Request) {
// Set common TUS response headers
w.Header().Set("Tus-Resumable", TusVersion)
// Check Tus-Resumable header for non-OPTIONS requests
if r.Method != http.MethodOptions {
tusVersion := r.Header.Get("Tus-Resumable")
if tusVersion != TusVersion {
http.Error(w, "Unsupported TUS version", http.StatusPreconditionFailed)
return
}
}
// Route based on method and path
reqPath := r.URL.Path
// TusBasePath is pre-normalized in filer_server.go (leading slash, no trailing slash)
tusPrefix := fs.option.TusBasePath
// Check if this is an upload location (contains upload ID after {tusPrefix}/.uploads/)
uploadsPrefix := tusPrefix + "/.uploads/"
if strings.HasPrefix(reqPath, uploadsPrefix) {
uploadID := strings.TrimPrefix(reqPath, uploadsPrefix)
uploadID = strings.Split(uploadID, "/")[0] // Get just the ID, not any trailing path
switch r.Method {
case http.MethodHead:
fs.tusHeadHandler(w, r, uploadID)
case http.MethodPatch:
fs.tusPatchHandler(w, r, uploadID)
case http.MethodDelete:
fs.tusDeleteHandler(w, r, uploadID)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
return
}
// Handle creation endpoints (POST to /.tus/{path})
switch r.Method {
case http.MethodOptions:
fs.tusOptionsHandler(w, r)
case http.MethodPost:
fs.tusCreateHandler(w, r)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
}
// tusOptionsHandler handles OPTIONS requests for capability discovery
func (fs *FilerServer) tusOptionsHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Tus-Version", TusVersion)
w.Header().Set("Tus-Extension", TusExtensions)
w.Header().Set("Tus-Max-Size", strconv.FormatInt(TusMaxSize, 10))
w.WriteHeader(http.StatusOK)
}
// tusCreateHandler handles POST requests to create new uploads
func (fs *FilerServer) tusCreateHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
// Parse Upload-Length header (required)
uploadLengthStr := r.Header.Get("Upload-Length")
if uploadLengthStr == "" {
http.Error(w, "Upload-Length header required", http.StatusBadRequest)
return
}
uploadLength, err := strconv.ParseInt(uploadLengthStr, 10, 64)
if err != nil || uploadLength < 0 {
http.Error(w, "Invalid Upload-Length", http.StatusBadRequest)
return
}
if uploadLength > TusMaxSize {
http.Error(w, "Upload-Length exceeds maximum", http.StatusRequestEntityTooLarge)
return
}
// Parse Upload-Metadata header (optional)
metadata := parseTusMetadata(r.Header.Get("Upload-Metadata"))
// TusBasePath is pre-normalized in filer_server.go (leading slash, no trailing slash)
tusPrefix := fs.option.TusBasePath
// Determine target path from request URL
targetPath := strings.TrimPrefix(r.URL.Path, tusPrefix)
if targetPath == "" || targetPath == "/" {
http.Error(w, "Target path required", http.StatusBadRequest)
return
}
// Generate upload ID
uploadID := uuid.New().String()
// Create upload session
session, err := fs.createTusSession(ctx, uploadID, targetPath, uploadLength, metadata)
if err != nil {
glog.Errorf("Failed to create TUS session: %v", err)
http.Error(w, "Failed to create upload", http.StatusInternalServerError)
return
}
// Build upload location URL (ensure it starts with single /)
uploadLocation := path.Clean(fmt.Sprintf("%s/.uploads/%s", tusPrefix, uploadID))
if !strings.HasPrefix(uploadLocation, "/") {
uploadLocation = "/" + uploadLocation
}
// Handle creation-with-upload extension
// TUS requires Content-Length for uploads; reject chunked encoding
if r.Header.Get("Content-Type") == "application/offset+octet-stream" {
if r.ContentLength < 0 {
fs.deleteTusSession(ctx, uploadID)
http.Error(w, "Content-Length header required for creation-with-upload", http.StatusBadRequest)
return
}
if r.ContentLength > 0 {
// Upload data in the creation request
bytesWritten, uploadErr := fs.tusWriteData(ctx, session, 0, r.Body, r.ContentLength)
if uploadErr != nil {
// Cleanup session on failure
fs.deleteTusSession(ctx, uploadID)
if errors.Is(uploadErr, ErrContentTooLarge) {
http.Error(w, "Content-Length exceeds declared upload size", http.StatusRequestEntityTooLarge)
return
}
glog.Errorf("Failed to write initial TUS data: %v", uploadErr)
http.Error(w, "Failed to write data", http.StatusInternalServerError)
return
}
// Update offset in response header
w.Header().Set("Upload-Offset", strconv.FormatInt(bytesWritten, 10))
// Check if upload is complete
if bytesWritten == session.Size {
// Refresh session to get updated chunks
session, err = fs.getTusSession(ctx, uploadID)
if err != nil {
glog.Errorf("Failed to get updated TUS session: %v", err)
http.Error(w, "Failed to complete upload", http.StatusInternalServerError)
return
}
if err := fs.completeTusUpload(ctx, session); err != nil {
glog.Errorf("Failed to complete TUS upload: %v", err)
http.Error(w, "Failed to complete upload", http.StatusInternalServerError)
return
}
}
}
// ContentLength == 0 is allowed, just proceed to respond
}
w.Header().Set("Location", uploadLocation)
w.WriteHeader(http.StatusCreated)
}
// tusHeadHandler handles HEAD requests to get current upload offset
func (fs *FilerServer) tusHeadHandler(w http.ResponseWriter, r *http.Request, uploadID string) {
ctx := r.Context()
session, err := fs.getTusSession(ctx, uploadID)
if err != nil {
http.Error(w, "Upload not found", http.StatusNotFound)
return
}
w.Header().Set("Upload-Offset", strconv.FormatInt(session.Offset, 10))
w.Header().Set("Upload-Length", strconv.FormatInt(session.Size, 10))
w.Header().Set("Cache-Control", "no-store")
w.WriteHeader(http.StatusOK)
}
// tusPatchHandler handles PATCH requests to upload data
func (fs *FilerServer) tusPatchHandler(w http.ResponseWriter, r *http.Request, uploadID string) {
ctx := r.Context()
// Validate Content-Type
contentType := r.Header.Get("Content-Type")
if contentType != "application/offset+octet-stream" {
http.Error(w, "Content-Type must be application/offset+octet-stream", http.StatusUnsupportedMediaType)
return
}
// Get current session
session, err := fs.getTusSession(ctx, uploadID)
if err != nil {
http.Error(w, "Upload not found", http.StatusNotFound)
return
}
// Validate Upload-Offset header
uploadOffsetStr := r.Header.Get("Upload-Offset")
if uploadOffsetStr == "" {
http.Error(w, "Upload-Offset header required", http.StatusBadRequest)
return
}
uploadOffset, err := strconv.ParseInt(uploadOffsetStr, 10, 64)
if err != nil || uploadOffset < 0 {
http.Error(w, "Invalid Upload-Offset", http.StatusBadRequest)
return
}
// Check offset matches current position
if uploadOffset != session.Offset {
http.Error(w, fmt.Sprintf("Offset mismatch: expected %d, got %d", session.Offset, uploadOffset), http.StatusConflict)
return
}
// TUS requires Content-Length header for PATCH requests
if r.ContentLength < 0 {
http.Error(w, "Content-Length header required", http.StatusBadRequest)
return
}
// Write data
bytesWritten, err := fs.tusWriteData(ctx, session, uploadOffset, r.Body, r.ContentLength)
if err != nil {
if errors.Is(err, ErrContentTooLarge) {
http.Error(w, "Content-Length exceeds remaining upload size", http.StatusRequestEntityTooLarge)
return
}
glog.Errorf("Failed to write TUS data: %v", err)
http.Error(w, "Failed to write data", http.StatusInternalServerError)
return
}
newOffset := uploadOffset + bytesWritten
// Check if upload is complete
if newOffset == session.Size {
// Refresh session to get updated chunks
session, err = fs.getTusSession(ctx, uploadID)
if err != nil {
glog.Errorf("Failed to get updated TUS session: %v", err)
http.Error(w, "Failed to complete upload", http.StatusInternalServerError)
return
}
if err := fs.completeTusUpload(ctx, session); err != nil {
glog.Errorf("Failed to complete TUS upload: %v", err)
http.Error(w, "Failed to complete upload", http.StatusInternalServerError)
return
}
}
w.Header().Set("Upload-Offset", strconv.FormatInt(newOffset, 10))
w.WriteHeader(http.StatusNoContent)
}
// tusDeleteHandler handles DELETE requests to cancel uploads
func (fs *FilerServer) tusDeleteHandler(w http.ResponseWriter, r *http.Request, uploadID string) {
ctx := r.Context()
if err := fs.deleteTusSession(ctx, uploadID); err != nil {
glog.Errorf("Failed to delete TUS session: %v", err)
http.Error(w, "Failed to delete upload", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
}
// tusChunkSize is the size of sub-chunks used when streaming uploads to volume servers.
// 4MB balances memory usage (avoiding buffering large TUS chunks) with upload efficiency
// (minimizing the number of volume server requests). Smaller values reduce memory but
// increase request overhead; larger values do the opposite.
const tusChunkSize = 4 * 1024 * 1024 // 4MB
// ErrContentTooLarge is returned when Content-Length exceeds remaining upload space
var ErrContentTooLarge = fmt.Errorf("content length exceeds remaining upload size")
// tusWriteData uploads data to volume servers in streaming chunks and updates session
// It reads data in fixed-size sub-chunks to avoid buffering large TUS chunks entirely in memory
func (fs *FilerServer) tusWriteData(ctx context.Context, session *TusSession, offset int64, reader io.Reader, contentLength int64) (int64, error) {
if contentLength == 0 {
return 0, nil
}
// Check if content length exceeds remaining size - return error instead of silently truncating
remaining := session.Size - offset
if contentLength > remaining {
return 0, ErrContentTooLarge
}
if remaining <= 0 {
return 0, nil
}
// Determine storage options based on target path
so, err := fs.detectStorageOption0(ctx, session.TargetPath, "", "", "", "", "", "", "", "", "")
if err != nil {
return 0, fmt.Errorf("detect storage option: %w", err)
}
// Read first bytes for MIME type detection
sniffSize := int64(512)
if contentLength < sniffSize {
sniffSize = contentLength
}
sniffBuf := make([]byte, sniffSize)
sniffN, sniffErr := io.ReadFull(reader, sniffBuf)
if sniffErr != nil && sniffErr != io.EOF && sniffErr != io.ErrUnexpectedEOF {
return 0, fmt.Errorf("read data for mime detection: %w", sniffErr)
}
if sniffN == 0 {
return 0, nil
}
sniffBuf = sniffBuf[:sniffN]
mimeType := http.DetectContentType(sniffBuf)
// Create a combined reader with sniffed bytes prepended
var dataReader io.Reader
if int64(sniffN) >= contentLength {
dataReader = bytes.NewReader(sniffBuf)
} else {
dataReader = io.MultiReader(bytes.NewReader(sniffBuf), io.LimitReader(reader, contentLength-int64(sniffN)))
}
// Upload in streaming chunks to avoid buffering entire content in memory
var totalWritten int64
var uploadErr error
var uploadedChunks []*TusChunkInfo
// Create one uploader for all sub-chunks to reuse HTTP client connections
uploader, uploaderErr := operation.NewUploader()
if uploaderErr != nil {
return 0, fmt.Errorf("create uploader: %w", uploaderErr)
}
chunkBuf := make([]byte, tusChunkSize)
currentOffset := offset
for totalWritten < contentLength {
// Read up to tusChunkSize bytes
readSize := int64(tusChunkSize)
if contentLength-totalWritten < readSize {
readSize = contentLength - totalWritten
}
n, readErr := io.ReadFull(dataReader, chunkBuf[:readSize])
if readErr != nil && readErr != io.EOF && readErr != io.ErrUnexpectedEOF {
uploadErr = fmt.Errorf("read chunk data: %w", readErr)
break
}
if n == 0 {
break
}
chunkData := chunkBuf[:n]
// Assign file ID from master for this sub-chunk
fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(ctx, so)
if assignErr != nil {
uploadErr = fmt.Errorf("assign volume: %w", assignErr)
break
}
// Upload to volume server using BytesReader (avoids double buffering in uploader)
uploadResult, uploadResultErr, _ := uploader.Upload(ctx, util.NewBytesReader(chunkData), &operation.UploadOption{
UploadUrl: urlLocation,
Filename: "",
Cipher: fs.option.Cipher,
IsInputCompressed: false,
MimeType: mimeType,
PairMap: nil,
Jwt: auth,
})
if uploadResultErr != nil {
uploadErr = fmt.Errorf("upload data: %w", uploadResultErr)
break
}
// Create chunk info and save it
chunk := &TusChunkInfo{
Offset: currentOffset,
Size: int64(uploadResult.Size),
FileId: fileId,
UploadAt: time.Now().UnixNano(),
}
if saveErr := fs.saveTusChunk(ctx, session.ID, chunk); saveErr != nil {
// Cleanup this chunk on failure
fs.filer.DeleteChunks(ctx, util.FullPath(session.TargetPath), []*filer_pb.FileChunk{
{FileId: fileId},
})
uploadErr = fmt.Errorf("update session: %w", saveErr)
break
}
uploadedChunks = append(uploadedChunks, chunk)
totalWritten += int64(uploadResult.Size)
currentOffset += int64(uploadResult.Size)
stats.FilerHandlerCounter.WithLabelValues("tusUploadChunk").Inc()
}
if uploadErr != nil {
// Cleanup all uploaded chunks on error
if len(uploadedChunks) > 0 {
var chunksToDelete []*filer_pb.FileChunk
for _, c := range uploadedChunks {
chunksToDelete = append(chunksToDelete, &filer_pb.FileChunk{FileId: c.FileId})
}
fs.filer.DeleteChunks(ctx, util.FullPath(session.TargetPath), chunksToDelete)
}
return 0, uploadErr
}
return totalWritten, nil
}
// parseTusMetadata parses the Upload-Metadata header
// Format: key1 base64value1,key2 base64value2,...
func parseTusMetadata(header string) map[string]string {
metadata := make(map[string]string)
if header == "" {
return metadata
}
pairs := strings.Split(header, ",")
for _, pair := range pairs {
pair = strings.TrimSpace(pair)
parts := strings.SplitN(pair, " ", 2)
if len(parts) != 2 {
continue
}
key := strings.TrimSpace(parts[0])
encodedValue := strings.TrimSpace(parts[1])
value, err := base64.StdEncoding.DecodeString(encodedValue)
if err != nil {
glog.V(1).Infof("Failed to decode TUS metadata value for key %s: %v", key, err)
continue
}
metadata[key] = string(value)
}
return metadata
}