Files
seaweedFS/weed/server/filer_server_tus_session.go
Chris Lu 1b1e5f69a2 Add TUS protocol support for resumable uploads (#7592)
* Add TUS protocol integration tests

This commit adds integration tests for the TUS (resumable upload) protocol
in preparation for implementing TUS support in the filer.

Test coverage includes:
- OPTIONS handler for capability discovery
- Basic single-request upload
- Chunked/resumable uploads
- HEAD requests for offset tracking
- DELETE for upload cancellation
- Error handling (invalid offsets, missing uploads)
- Creation-with-upload extension
- Resume after interruption simulation

Tests are skipped in short mode and require a running SeaweedFS cluster.

* Add TUS session storage types and utilities

Implements TUS upload session management:
- TusSession struct for tracking upload state
- Session creation with directory-based storage
- Session persistence using filer entries
- Session retrieval and offset updates
- Session deletion with chunk cleanup
- Upload completion with chunk assembly into final file

Session data is stored in /.uploads.tus/{upload-id}/ directory,
following the pattern used by S3 multipart uploads.

* Add TUS HTTP handlers

Implements TUS protocol HTTP handlers:
- tusHandler: Main entry point routing requests
- tusOptionsHandler: Capability discovery (OPTIONS)
- tusCreateHandler: Create new upload (POST)
- tusHeadHandler: Get upload offset (HEAD)
- tusPatchHandler: Upload data at offset (PATCH)
- tusDeleteHandler: Cancel upload (DELETE)
- tusWriteData: Upload data to volume servers

Features:
- Supports creation-with-upload extension
- Validates TUS protocol headers
- Offset conflict detection
- Automatic upload completion when size is reached
- Metadata parsing from Upload-Metadata header

* Wire up TUS protocol routes in filer server

Add TUS handler route (/.tus/) to the filer HTTP server.
The TUS route is registered before the catch-all route to ensure
proper routing of TUS protocol requests.

TUS protocol is now accessible at:
- OPTIONS /.tus/ - Capability discovery
- POST /.tus/{path} - Create upload
- HEAD /.tus/.uploads/{id} - Get offset
- PATCH /.tus/.uploads/{id} - Upload data
- DELETE /.tus/.uploads/{id} - Cancel upload

* Improve TUS integration test setup

Add comprehensive Makefile for TUS tests with targets:
- test-with-server: Run tests with automatic server management
- test-basic/chunked/resume/errors: Specific test categories
- manual-start/stop: For development testing
- debug-logs/status: For debugging
- ci-test: For CI/CD pipelines

Update README.md with:
- Detailed TUS protocol documentation
- All endpoint descriptions with headers
- Usage examples with curl commands
- Architecture diagram
- Comparison with S3 multipart uploads

Follows the pattern established by other tests in test/ folder.

* Fix TUS integration tests and creation-with-upload

- Fix test URLs to use full URLs instead of relative paths
- Fix creation-with-upload to refresh session before completing
- Fix Makefile to properly handle test cleanup
- Add FullURL helper function to TestCluster

* Add TUS protocol tests to GitHub Actions CI

- Add tus-tests.yml workflow that runs on PRs and pushes
- Runs when TUS-related files are modified
- Automatic server management for integration testing
- Upload logs on failure for debugging

* Make TUS base path configurable via CLI

- Add -tus.path CLI flag to filer command
- TUS is disabled by default (empty path)
- Example: -tus.path=/.tus to enable at /.tus endpoint
- Update test Makefile to use -tus.path flag
- Update README with TUS enabling instructions

* Rename -tus.path to -tusBasePath with default .tus

- Rename CLI flag from -tus.path to -tusBasePath
- Default to .tus (TUS enabled by default)
- Add -filer.tusBasePath option to weed server command
- Properly handle path prefix (prepend / if missing)

* Address code review comments

- Sort chunks by offset before assembling final file
- Use chunk.Offset directly instead of recalculating
- Return error on invalid file ID instead of skipping
- Require Content-Length header for PATCH requests
- Use fs.option.Cipher for encryption setting
- Detect MIME type from data using http.DetectContentType
- Fix concurrency group for push events in workflow
- Use os.Interrupt instead of Kill for graceful shutdown in tests

* fmt

* Address remaining code review comments

- Fix potential open redirect vulnerability by sanitizing uploadLocation path
- Add language specifier to README code block
- Handle os.Create errors in test setup
- Use waitForHTTPServer instead of time.Sleep for master/volume readiness
- Improve test reliability and debugging

* Address critical and high-priority review comments

- Add per-session locking to prevent race conditions in updateTusSessionOffset
- Stream data directly to volume server instead of buffering entire chunk
- Only buffer 512 bytes for MIME type detection, then stream remaining data
- Clean up session locks when session is deleted

* Fix race condition to work across multiple filer instances

- Store each chunk as a separate file entry instead of updating session JSON
- Chunk file names encode offset, size, and fileId for atomic storage
- getTusSession loads chunks from directory listing (atomic read)
- Eliminates read-modify-write race condition across multiple filers
- Remove in-memory mutex that only worked for single filer instance

* Address code review comments: fix variable shadowing, sniff size, and test stability

- Rename path variable to reqPath to avoid shadowing path package
- Make sniff buffer size respect contentLength (read at most contentLength bytes)
- Handle Content-Length < 0 in creation-with-upload (return error for chunked encoding)
- Fix test cluster: use temp directory for filer store, add startup delay

* Fix test stability: increase cluster stabilization delay to 5 seconds

The tests were intermittently failing because the volume server needed more
time to create volumes and register with the master. Increasing the delay
from 2 to 5 seconds fixes the flaky test behavior.

* Address PR review comments for TUS protocol support

- Fix strconv.Atoi error handling in test file (lines 386, 747)
- Fix lossy fileId encoding: use base64 instead of underscore replacement
- Add pagination support for ListDirectoryEntries in getTusSession
- Batch delete chunks instead of one-by-one in deleteTusSession

* Address additional PR review comments for TUS protocol

- Fix UploadAt timestamp: use entry.Crtime instead of time.Now()
- Remove redundant JSON content in chunk entry (metadata in filename)
- Refactor tusWriteData to stream in 4MB chunks to avoid OOM on large uploads
- Pass filer.Entry to parseTusChunkPath to preserve actual upload time

* Address more PR review comments for TUS protocol

- Normalize TUS path once in filer_server.go, store in option.TusPath
- Remove redundant path normalization from TUS handlers
- Remove goto statement in tusCreateHandler, simplify control flow

* Remove unnecessary mutexes in tusWriteData

The upload loop is sequential, so uploadErrLock and chunksLock are not needed.

* Rename updateTusSessionOffset to saveTusChunk

Remove unused newOffset parameter and rename function to better reflect its purpose.

* Improve TUS upload performance and add path validation

- Reuse operation.Uploader across sub-chunks for better connection reuse
- Guard against TusPath='/' to prevent hijacking all filer routes

* Address PR review comments for TUS protocol

- Fix critical chunk filename parsing: use strings.Cut instead of SplitN
  to correctly handle base64-encoded fileIds that may contain underscores
- Rename tusPath to tusBasePath for naming consistency across codebase
- Add background garbage collection for expired TUS sessions (runs hourly)
- Improve error messages with %w wrapping for better debuggability

* Address additional TUS PR review comments

- Fix tusBasePath default to use leading slash (/.tus) for consistency
- Add chunk contiguity validation in completeTusUpload to detect gaps/overlaps
- Fix offset calculation to find maximum contiguous range from 0, not just last chunk
- Return 413 Request Entity Too Large instead of silently truncating content
- Document tusChunkSize rationale (4MB balances memory vs request overhead)
- Fix Makefile xargs portability by removing GNU-specific -r flag
- Add explicit -tusBasePath flag to integration test for robustness
- Fix README example to use /.uploads/tus path format

* Revert log_buffer changes (moved to separate PR)

* Minor style fixes from PR review

- Simplify tusBasePath flag description to use example format
- Add 'TUS upload' prefix to session not found error message
- Remove duplicate tusChunkSize comment
- Capitalize warning message for consistency
- Add grep filter to Makefile xargs for better empty input handling
2025-12-14 21:56:07 -08:00

449 lines
14 KiB
Go

package weed_server
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"os"
"sort"
"strconv"
"strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
const (
TusVersion = "1.0.0"
TusMaxSize = int64(5 * 1024 * 1024 * 1024) // 5GB default max size
TusUploadsFolder = ".uploads.tus"
TusInfoFileName = ".info"
TusChunkExt = ".chunk"
TusExtensions = "creation,creation-with-upload,termination"
)
// TusSession represents an in-progress TUS upload session
type TusSession struct {
ID string `json:"id"`
TargetPath string `json:"target_path"`
Size int64 `json:"size"`
Offset int64 `json:"offset"`
Metadata map[string]string `json:"metadata,omitempty"`
CreatedAt time.Time `json:"created_at"`
ExpiresAt time.Time `json:"expires_at,omitempty"`
Chunks []*TusChunkInfo `json:"chunks,omitempty"`
}
// TusChunkInfo tracks individual chunk uploads within a session
type TusChunkInfo struct {
Offset int64 `json:"offset"`
Size int64 `json:"size"`
FileId string `json:"file_id"`
UploadAt int64 `json:"upload_at"`
}
// tusSessionDir returns the directory path for storing TUS upload sessions
func (fs *FilerServer) tusSessionDir() string {
return "/" + TusUploadsFolder
}
// tusSessionPath returns the path to a specific upload session directory
func (fs *FilerServer) tusSessionPath(uploadID string) string {
return fmt.Sprintf("/%s/%s", TusUploadsFolder, uploadID)
}
// tusSessionInfoPath returns the path to the session info file
func (fs *FilerServer) tusSessionInfoPath(uploadID string) string {
return fmt.Sprintf("/%s/%s/%s", TusUploadsFolder, uploadID, TusInfoFileName)
}
// tusChunkPath returns the path to store a chunk info file
// Format: /{TusUploadsFolder}/{uploadID}/chunk_{offset}_{size}_{encodedFileId}
func (fs *FilerServer) tusChunkPath(uploadID string, offset, size int64, fileId string) string {
// Use URL-safe base64 encoding to safely encode fileId (handles both / and _ in fileId)
encodedFileId := base64.RawURLEncoding.EncodeToString([]byte(fileId))
return fmt.Sprintf("/%s/%s/chunk_%016d_%016d_%s", TusUploadsFolder, uploadID, offset, size, encodedFileId)
}
// parseTusChunkPath parses chunk info from a chunk entry
// The entry's Crtime is used for the UploadAt timestamp to preserve the actual upload time
func parseTusChunkPath(entry *filer.Entry) (*TusChunkInfo, error) {
name := entry.Name()
if !strings.HasPrefix(name, "chunk_") {
return nil, fmt.Errorf("not a chunk file: %s", name)
}
// Use strings.Cut to correctly handle base64-encoded fileId which may contain underscores
s := name[6:] // Skip "chunk_" prefix
offsetStr, rest, found := strings.Cut(s, "_")
if !found {
return nil, fmt.Errorf("invalid chunk file name format (missing offset): %s", name)
}
sizeStr, encodedFileId, found := strings.Cut(rest, "_")
if !found {
return nil, fmt.Errorf("invalid chunk file name format (missing size): %s", name)
}
offset, err := strconv.ParseInt(offsetStr, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid offset in chunk file %q: %w", name, err)
}
size, err := strconv.ParseInt(sizeStr, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid size in chunk file %q: %w", name, err)
}
// Decode fileId from URL-safe base64
fileIdBytes, err := base64.RawURLEncoding.DecodeString(encodedFileId)
if err != nil {
return nil, fmt.Errorf("invalid fileId encoding in chunk file %q: %w", name, err)
}
return &TusChunkInfo{
Offset: offset,
Size: size,
FileId: string(fileIdBytes),
UploadAt: entry.Crtime.UnixNano(),
}, nil
}
// createTusSession creates a new TUS upload session
func (fs *FilerServer) createTusSession(ctx context.Context, uploadID, targetPath string, size int64, metadata map[string]string) (*TusSession, error) {
session := &TusSession{
ID: uploadID,
TargetPath: targetPath,
Size: size,
Offset: 0,
Metadata: metadata,
CreatedAt: time.Now(),
ExpiresAt: time.Now().Add(7 * 24 * time.Hour), // 7 days default expiration
Chunks: []*TusChunkInfo{},
}
// Create session directory
sessionDirPath := util.FullPath(fs.tusSessionPath(uploadID))
if err := fs.filer.CreateEntry(ctx, &filer.Entry{
FullPath: sessionDirPath,
Attr: filer.Attr{
Mode: os.ModeDir | 0755,
Crtime: time.Now(),
Mtime: time.Now(),
Uid: OS_UID,
Gid: OS_GID,
},
}, false, false, nil, false, fs.filer.MaxFilenameLength); err != nil {
return nil, fmt.Errorf("create session directory: %w", err)
}
// Save session info
if err := fs.saveTusSession(ctx, session); err != nil {
// Cleanup the directory on failure
fs.filer.DeleteEntryMetaAndData(ctx, sessionDirPath, true, true, false, false, nil, 0)
return nil, fmt.Errorf("save session info: %w", err)
}
glog.V(2).Infof("Created TUS session %s for %s, size=%d", uploadID, targetPath, size)
return session, nil
}
// saveTusSession saves the session info to the filer
func (fs *FilerServer) saveTusSession(ctx context.Context, session *TusSession) error {
sessionData, err := json.Marshal(session)
if err != nil {
return fmt.Errorf("marshal session: %w", err)
}
infoPath := util.FullPath(fs.tusSessionInfoPath(session.ID))
entry := &filer.Entry{
FullPath: infoPath,
Attr: filer.Attr{
Mode: 0644,
Crtime: session.CreatedAt,
Mtime: time.Now(),
Uid: OS_UID,
Gid: OS_GID,
},
Content: sessionData,
}
if err := fs.filer.CreateEntry(ctx, entry, false, false, nil, false, fs.filer.MaxFilenameLength); err != nil {
return fmt.Errorf("save session info entry: %w", err)
}
return nil
}
// getTusSession retrieves a TUS session by upload ID, including chunks from directory listing
func (fs *FilerServer) getTusSession(ctx context.Context, uploadID string) (*TusSession, error) {
infoPath := util.FullPath(fs.tusSessionInfoPath(uploadID))
entry, err := fs.filer.FindEntry(ctx, infoPath)
if err != nil {
if err == filer_pb.ErrNotFound {
return nil, fmt.Errorf("TUS upload session not found: %s", uploadID)
}
return nil, fmt.Errorf("find session: %w", err)
}
var session TusSession
if err := json.Unmarshal(entry.Content, &session); err != nil {
return nil, fmt.Errorf("unmarshal session: %w", err)
}
// Load chunks from directory listing with pagination (atomic read, no race condition)
sessionDirPath := util.FullPath(fs.tusSessionPath(uploadID))
session.Chunks = nil
session.Offset = 0
lastFileName := ""
pageSize := 1000
for {
entries, hasMore, err := fs.filer.ListDirectoryEntries(ctx, sessionDirPath, lastFileName, false, int64(pageSize), "", "", "")
if err != nil {
return nil, fmt.Errorf("list session directory: %w", err)
}
for _, e := range entries {
if strings.HasPrefix(e.Name(), "chunk_") {
chunk, parseErr := parseTusChunkPath(e)
if parseErr != nil {
glog.V(1).Infof("Skipping invalid chunk file %s: %v", e.Name(), parseErr)
continue
}
session.Chunks = append(session.Chunks, chunk)
}
lastFileName = e.Name()
}
if !hasMore || len(entries) < pageSize {
break
}
}
// Sort chunks by offset and compute current offset as maximum contiguous range from 0
if len(session.Chunks) > 0 {
sort.Slice(session.Chunks, func(i, j int) bool {
return session.Chunks[i].Offset < session.Chunks[j].Offset
})
// Compute the maximum contiguous offset from 0
// This correctly handles gaps in the upload sequence
contiguousEnd := int64(0)
for _, chunk := range session.Chunks {
if chunk.Offset > contiguousEnd {
// Gap detected, stop at the first gap
break
}
chunkEnd := chunk.Offset + chunk.Size
if chunkEnd > contiguousEnd {
contiguousEnd = chunkEnd
}
}
session.Offset = contiguousEnd
}
return &session, nil
}
// saveTusChunk stores the chunk info as a separate file entry
// This avoids read-modify-write race conditions across multiple filer instances
// The chunk metadata is encoded in the filename; the entry's Crtime preserves upload time
func (fs *FilerServer) saveTusChunk(ctx context.Context, uploadID string, chunk *TusChunkInfo) error {
if chunk == nil {
return nil
}
// Store chunk info as a separate file entry (atomic operation)
// Chunk metadata is encoded in the filename; Crtime is used for UploadAt when reading back
chunkPath := util.FullPath(fs.tusChunkPath(uploadID, chunk.Offset, chunk.Size, chunk.FileId))
if err := fs.filer.CreateEntry(ctx, &filer.Entry{
FullPath: chunkPath,
Attr: filer.Attr{
Mode: 0644,
Crtime: time.Now(),
Mtime: time.Now(),
Uid: OS_UID,
Gid: OS_GID,
},
}, false, false, nil, false, fs.filer.MaxFilenameLength); err != nil {
return fmt.Errorf("save chunk info: %w", err)
}
return nil
}
// deleteTusSession removes a TUS upload session and all its data
func (fs *FilerServer) deleteTusSession(ctx context.Context, uploadID string) error {
session, err := fs.getTusSession(ctx, uploadID)
if err != nil {
// Session might already be deleted or never existed
glog.V(1).Infof("TUS session %s not found for deletion: %v", uploadID, err)
return nil
}
// Batch delete all uploaded chunks from volume servers
if len(session.Chunks) > 0 {
var chunksToDelete []*filer_pb.FileChunk
for _, chunk := range session.Chunks {
if chunk.FileId != "" {
chunksToDelete = append(chunksToDelete, &filer_pb.FileChunk{FileId: chunk.FileId})
}
}
if len(chunksToDelete) > 0 {
fs.filer.DeleteChunks(ctx, util.FullPath(session.TargetPath), chunksToDelete)
}
}
// Delete the session directory
sessionDirPath := util.FullPath(fs.tusSessionPath(uploadID))
if err := fs.filer.DeleteEntryMetaAndData(ctx, sessionDirPath, true, true, false, false, nil, 0); err != nil {
return fmt.Errorf("delete session directory: %w", err)
}
glog.V(2).Infof("Deleted TUS session %s", uploadID)
return nil
}
// completeTusUpload assembles all chunks and creates the final file
func (fs *FilerServer) completeTusUpload(ctx context.Context, session *TusSession) error {
if session.Offset != session.Size {
return fmt.Errorf("upload incomplete: offset=%d, expected=%d", session.Offset, session.Size)
}
// Sort chunks by offset to ensure correct order
sort.Slice(session.Chunks, func(i, j int) bool {
return session.Chunks[i].Offset < session.Chunks[j].Offset
})
// Validate chunks are contiguous with no gaps or overlaps
expectedOffset := int64(0)
for _, chunk := range session.Chunks {
if chunk.Offset != expectedOffset {
return fmt.Errorf("chunk gap or overlap detected: expected offset %d, got %d", expectedOffset, chunk.Offset)
}
expectedOffset = chunk.Offset + chunk.Size
}
if expectedOffset != session.Size {
return fmt.Errorf("chunks do not cover full file: chunks end at %d, expected %d", expectedOffset, session.Size)
}
// Assemble file chunks in order
var fileChunks []*filer_pb.FileChunk
for _, chunk := range session.Chunks {
fid, fidErr := filer_pb.ToFileIdObject(chunk.FileId)
if fidErr != nil {
return fmt.Errorf("invalid file ID %s at offset %d: %w", chunk.FileId, chunk.Offset, fidErr)
}
fileChunk := &filer_pb.FileChunk{
FileId: chunk.FileId,
Offset: chunk.Offset,
Size: uint64(chunk.Size),
ModifiedTsNs: chunk.UploadAt,
Fid: fid,
}
fileChunks = append(fileChunks, fileChunk)
}
// Determine content type from metadata
contentType := ""
if session.Metadata != nil {
if ct, ok := session.Metadata["content-type"]; ok {
contentType = ct
}
}
// Create the final file entry
targetPath := util.FullPath(session.TargetPath)
entry := &filer.Entry{
FullPath: targetPath,
Attr: filer.Attr{
Mode: 0644,
Crtime: session.CreatedAt,
Mtime: time.Now(),
Uid: OS_UID,
Gid: OS_GID,
Mime: contentType,
},
Chunks: fileChunks,
}
// Ensure parent directory exists
if err := fs.filer.CreateEntry(ctx, entry, false, false, nil, false, fs.filer.MaxFilenameLength); err != nil {
return fmt.Errorf("create final file entry: %w", err)
}
// Delete the session (but keep the chunks since they're now part of the final file)
sessionDirPath := util.FullPath(fs.tusSessionPath(session.ID))
if err := fs.filer.DeleteEntryMetaAndData(ctx, sessionDirPath, true, false, false, false, nil, 0); err != nil {
glog.V(1).Infof("Failed to cleanup TUS session directory %s: %v", session.ID, err)
}
glog.V(2).Infof("Completed TUS upload %s -> %s, size=%d, chunks=%d",
session.ID, session.TargetPath, session.Size, len(fileChunks))
return nil
}
// StartTusSessionCleanup starts a background goroutine that periodically cleans up expired TUS sessions
func (fs *FilerServer) StartTusSessionCleanup(interval time.Duration) {
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
fs.cleanupExpiredTusSessions()
}
}()
glog.V(0).Infof("TUS session cleanup started with interval %v", interval)
}
// cleanupExpiredTusSessions scans for and removes expired TUS upload sessions
func (fs *FilerServer) cleanupExpiredTusSessions() {
ctx := context.Background()
uploadsDir := util.FullPath(fs.tusSessionDir())
// List all session directories under the TUS uploads folder
var lastFileName string
const pageSize = 100
for {
entries, hasMore, err := fs.filer.ListDirectoryEntries(ctx, uploadsDir, lastFileName, false, int64(pageSize), "", "", "")
if err != nil {
glog.V(1).Infof("TUS cleanup: failed to list sessions: %v", err)
return
}
now := time.Now()
for _, entry := range entries {
if !entry.IsDirectory() {
lastFileName = entry.Name()
continue
}
uploadID := entry.Name()
session, err := fs.getTusSession(ctx, uploadID)
if err != nil {
glog.V(2).Infof("TUS cleanup: skipping session %s: %v", uploadID, err)
lastFileName = uploadID
continue
}
if !session.ExpiresAt.IsZero() && now.After(session.ExpiresAt) {
glog.V(1).Infof("TUS cleanup: removing expired session %s (expired at %v)", uploadID, session.ExpiresAt)
if err := fs.deleteTusSession(ctx, uploadID); err != nil {
glog.V(1).Infof("TUS cleanup: failed to delete session %s: %v", uploadID, err)
}
}
lastFileName = uploadID
}
if !hasMore || len(entries) < pageSize {
break
}
}
}