Files
seaweedFS/weed/server/filer_server.go
Chris Lu 1b1e5f69a2 Add TUS protocol support for resumable uploads (#7592)
* Add TUS protocol integration tests

This commit adds integration tests for the TUS (resumable upload) protocol
in preparation for implementing TUS support in the filer.

Test coverage includes:
- OPTIONS handler for capability discovery
- Basic single-request upload
- Chunked/resumable uploads
- HEAD requests for offset tracking
- DELETE for upload cancellation
- Error handling (invalid offsets, missing uploads)
- Creation-with-upload extension
- Resume after interruption simulation

Tests are skipped in short mode and require a running SeaweedFS cluster.

* Add TUS session storage types and utilities

Implements TUS upload session management:
- TusSession struct for tracking upload state
- Session creation with directory-based storage
- Session persistence using filer entries
- Session retrieval and offset updates
- Session deletion with chunk cleanup
- Upload completion with chunk assembly into final file

Session data is stored in /.uploads.tus/{upload-id}/ directory,
following the pattern used by S3 multipart uploads.

* Add TUS HTTP handlers

Implements TUS protocol HTTP handlers:
- tusHandler: Main entry point routing requests
- tusOptionsHandler: Capability discovery (OPTIONS)
- tusCreateHandler: Create new upload (POST)
- tusHeadHandler: Get upload offset (HEAD)
- tusPatchHandler: Upload data at offset (PATCH)
- tusDeleteHandler: Cancel upload (DELETE)
- tusWriteData: Upload data to volume servers

Features:
- Supports creation-with-upload extension
- Validates TUS protocol headers
- Offset conflict detection
- Automatic upload completion when size is reached
- Metadata parsing from Upload-Metadata header

* Wire up TUS protocol routes in filer server

Add TUS handler route (/.tus/) to the filer HTTP server.
The TUS route is registered before the catch-all route to ensure
proper routing of TUS protocol requests.

TUS protocol is now accessible at:
- OPTIONS /.tus/ - Capability discovery
- POST /.tus/{path} - Create upload
- HEAD /.tus/.uploads/{id} - Get offset
- PATCH /.tus/.uploads/{id} - Upload data
- DELETE /.tus/.uploads/{id} - Cancel upload

* Improve TUS integration test setup

Add comprehensive Makefile for TUS tests with targets:
- test-with-server: Run tests with automatic server management
- test-basic/chunked/resume/errors: Specific test categories
- manual-start/stop: For development testing
- debug-logs/status: For debugging
- ci-test: For CI/CD pipelines

Update README.md with:
- Detailed TUS protocol documentation
- All endpoint descriptions with headers
- Usage examples with curl commands
- Architecture diagram
- Comparison with S3 multipart uploads

Follows the pattern established by other tests in test/ folder.

* Fix TUS integration tests and creation-with-upload

- Fix test URLs to use full URLs instead of relative paths
- Fix creation-with-upload to refresh session before completing
- Fix Makefile to properly handle test cleanup
- Add FullURL helper function to TestCluster

* Add TUS protocol tests to GitHub Actions CI

- Add tus-tests.yml workflow that runs on PRs and pushes
- Runs when TUS-related files are modified
- Automatic server management for integration testing
- Upload logs on failure for debugging

* Make TUS base path configurable via CLI

- Add -tus.path CLI flag to filer command
- TUS is disabled by default (empty path)
- Example: -tus.path=/.tus to enable at /.tus endpoint
- Update test Makefile to use -tus.path flag
- Update README with TUS enabling instructions

* Rename -tus.path to -tusBasePath with default .tus

- Rename CLI flag from -tus.path to -tusBasePath
- Default to .tus (TUS enabled by default)
- Add -filer.tusBasePath option to weed server command
- Properly handle path prefix (prepend / if missing)

* Address code review comments

- Sort chunks by offset before assembling final file
- Use chunk.Offset directly instead of recalculating
- Return error on invalid file ID instead of skipping
- Require Content-Length header for PATCH requests
- Use fs.option.Cipher for encryption setting
- Detect MIME type from data using http.DetectContentType
- Fix concurrency group for push events in workflow
- Use os.Interrupt instead of Kill for graceful shutdown in tests

* fmt

* Address remaining code review comments

- Fix potential open redirect vulnerability by sanitizing uploadLocation path
- Add language specifier to README code block
- Handle os.Create errors in test setup
- Use waitForHTTPServer instead of time.Sleep for master/volume readiness
- Improve test reliability and debugging

* Address critical and high-priority review comments

- Add per-session locking to prevent race conditions in updateTusSessionOffset
- Stream data directly to volume server instead of buffering entire chunk
- Only buffer 512 bytes for MIME type detection, then stream remaining data
- Clean up session locks when session is deleted

* Fix race condition to work across multiple filer instances

- Store each chunk as a separate file entry instead of updating session JSON
- Chunk file names encode offset, size, and fileId for atomic storage
- getTusSession loads chunks from directory listing (atomic read)
- Eliminates read-modify-write race condition across multiple filers
- Remove in-memory mutex that only worked for single filer instance

* Address code review comments: fix variable shadowing, sniff size, and test stability

- Rename path variable to reqPath to avoid shadowing path package
- Make sniff buffer size respect contentLength (read at most contentLength bytes)
- Handle Content-Length < 0 in creation-with-upload (return error for chunked encoding)
- Fix test cluster: use temp directory for filer store, add startup delay

* Fix test stability: increase cluster stabilization delay to 5 seconds

The tests were intermittently failing because the volume server needed more
time to create volumes and register with the master. Increasing the delay
from 2 to 5 seconds fixes the flaky test behavior.

* Address PR review comments for TUS protocol support

- Fix strconv.Atoi error handling in test file (lines 386, 747)
- Fix lossy fileId encoding: use base64 instead of underscore replacement
- Add pagination support for ListDirectoryEntries in getTusSession
- Batch delete chunks instead of one-by-one in deleteTusSession

* Address additional PR review comments for TUS protocol

- Fix UploadAt timestamp: use entry.Crtime instead of time.Now()
- Remove redundant JSON content in chunk entry (metadata in filename)
- Refactor tusWriteData to stream in 4MB chunks to avoid OOM on large uploads
- Pass filer.Entry to parseTusChunkPath to preserve actual upload time

* Address more PR review comments for TUS protocol

- Normalize TUS path once in filer_server.go, store in option.TusPath
- Remove redundant path normalization from TUS handlers
- Remove goto statement in tusCreateHandler, simplify control flow

* Remove unnecessary mutexes in tusWriteData

The upload loop is sequential, so uploadErrLock and chunksLock are not needed.

* Rename updateTusSessionOffset to saveTusChunk

Remove unused newOffset parameter and rename function to better reflect its purpose.

* Improve TUS upload performance and add path validation

- Reuse operation.Uploader across sub-chunks for better connection reuse
- Guard against TusPath='/' to prevent hijacking all filer routes

* Address PR review comments for TUS protocol

- Fix critical chunk filename parsing: use strings.Cut instead of SplitN
  to correctly handle base64-encoded fileIds that may contain underscores
- Rename tusPath to tusBasePath for naming consistency across codebase
- Add background garbage collection for expired TUS sessions (runs hourly)
- Improve error messages with %w wrapping for better debuggability

* Address additional TUS PR review comments

- Fix tusBasePath default to use leading slash (/.tus) for consistency
- Add chunk contiguity validation in completeTusUpload to detect gaps/overlaps
- Fix offset calculation to find maximum contiguous range from 0, not just last chunk
- Return 413 Request Entity Too Large instead of silently truncating content
- Document tusChunkSize rationale (4MB balances memory vs request overhead)
- Fix Makefile xargs portability by removing GNU-specific -r flag
- Add explicit -tusBasePath flag to integration test for robustness
- Fix README example to use /.uploads/tus path format

* Revert log_buffer changes (moved to separate PR)

* Minor style fixes from PR review

- Simplify tusBasePath flag description to use example format
- Add 'TUS upload' prefix to session not found error message
- Remove duplicate tusChunkSize comment
- Capitalize warning message for consistency
- Add grep filter to Makefile xargs for better empty input handling
2025-12-14 21:56:07 -08:00

280 lines
10 KiB
Go

package weed_server
import (
"context"
"fmt"
"net/http"
"os"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/seaweedfs/seaweedfs/weed/stats"
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/util/grace"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/filer"
_ "github.com/seaweedfs/seaweedfs/weed/filer/arangodb"
_ "github.com/seaweedfs/seaweedfs/weed/filer/cassandra"
_ "github.com/seaweedfs/seaweedfs/weed/filer/cassandra2"
_ "github.com/seaweedfs/seaweedfs/weed/filer/elastic/v7"
_ "github.com/seaweedfs/seaweedfs/weed/filer/etcd"
_ "github.com/seaweedfs/seaweedfs/weed/filer/foundationdb"
_ "github.com/seaweedfs/seaweedfs/weed/filer/hbase"
_ "github.com/seaweedfs/seaweedfs/weed/filer/leveldb"
_ "github.com/seaweedfs/seaweedfs/weed/filer/leveldb2"
_ "github.com/seaweedfs/seaweedfs/weed/filer/leveldb3"
_ "github.com/seaweedfs/seaweedfs/weed/filer/mongodb"
_ "github.com/seaweedfs/seaweedfs/weed/filer/mysql"
_ "github.com/seaweedfs/seaweedfs/weed/filer/mysql2"
_ "github.com/seaweedfs/seaweedfs/weed/filer/postgres"
_ "github.com/seaweedfs/seaweedfs/weed/filer/postgres2"
_ "github.com/seaweedfs/seaweedfs/weed/filer/redis"
_ "github.com/seaweedfs/seaweedfs/weed/filer/redis2"
_ "github.com/seaweedfs/seaweedfs/weed/filer/redis3"
_ "github.com/seaweedfs/seaweedfs/weed/filer/sqlite"
_ "github.com/seaweedfs/seaweedfs/weed/filer/tarantool"
_ "github.com/seaweedfs/seaweedfs/weed/filer/ydb"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/notification"
_ "github.com/seaweedfs/seaweedfs/weed/notification/aws_sqs"
_ "github.com/seaweedfs/seaweedfs/weed/notification/gocdk_pub_sub"
_ "github.com/seaweedfs/seaweedfs/weed/notification/google_pub_sub"
_ "github.com/seaweedfs/seaweedfs/weed/notification/kafka"
_ "github.com/seaweedfs/seaweedfs/weed/notification/log"
_ "github.com/seaweedfs/seaweedfs/weed/notification/webhook"
"github.com/seaweedfs/seaweedfs/weed/security"
)
type FilerOption struct {
Masters *pb.ServerDiscovery
FilerGroup string
Collection string
DefaultReplication string
DisableDirListing bool
MaxMB int
DirListingLimit int
DataCenter string
Rack string
DataNode string
DefaultLevelDbDir string
DisableHttp bool
Host pb.ServerAddress
recursiveDelete bool
Cipher bool
SaveToFilerLimit int64
ConcurrentUploadLimit int64
ConcurrentFileUploadLimit int64
ShowUIDirectoryDelete bool
DownloadMaxBytesPs int64
DiskType string
AllowedOrigins []string
ExposeDirectoryData bool
TusBasePath string
}
type FilerServer struct {
inFlightDataSize int64
inFlightUploads int64
listenersWaits int64
// notifying clients
listenersLock sync.Mutex
listenersCond *sync.Cond
inFlightDataLimitCond *sync.Cond
filer_pb.UnimplementedSeaweedFilerServer
option *FilerOption
secret security.SigningKey
filer *filer.Filer
filerGuard *security.Guard
volumeGuard *security.Guard
grpcDialOption grpc.DialOption
// metrics read from the master
metricsAddress string
metricsIntervalSec int
// track known metadata listeners
knownListenersLock sync.Mutex
knownListeners map[int32]int32
}
func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
v := util.GetViper()
signingKey := v.GetString("jwt.filer_signing.key")
v.SetDefault("jwt.filer_signing.expires_after_seconds", 10)
expiresAfterSec := v.GetInt("jwt.filer_signing.expires_after_seconds")
readSigningKey := v.GetString("jwt.filer_signing.read.key")
v.SetDefault("jwt.filer_signing.read.expires_after_seconds", 60)
readExpiresAfterSec := v.GetInt("jwt.filer_signing.read.expires_after_seconds")
volumeSigningKey := v.GetString("jwt.signing.key")
v.SetDefault("jwt.signing.expires_after_seconds", 10)
volumeExpiresAfterSec := v.GetInt("jwt.signing.expires_after_seconds")
volumeReadSigningKey := v.GetString("jwt.signing.read.key")
v.SetDefault("jwt.signing.read.expires_after_seconds", 60)
volumeReadExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds")
v.SetDefault("cors.allowed_origins.values", "*")
allowedOrigins := v.GetString("cors.allowed_origins.values")
domains := strings.Split(allowedOrigins, ",")
option.AllowedOrigins = domains
v.SetDefault("filer.expose_directory_metadata.enabled", true)
returnDirMetadata := v.GetBool("filer.expose_directory_metadata.enabled")
option.ExposeDirectoryData = returnDirMetadata
fs = &FilerServer{
option: option,
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
knownListeners: make(map[int32]int32),
inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
}
fs.listenersCond = sync.NewCond(&fs.listenersLock)
option.Masters.RefreshBySrvIfAvailable()
if len(option.Masters.GetInstances()) == 0 {
glog.Fatal("master list is required!")
}
if !util.LoadConfiguration("filer", false) {
v.SetDefault("leveldb2.enabled", true)
v.SetDefault("leveldb2.dir", option.DefaultLevelDbDir)
_, err := os.Stat(option.DefaultLevelDbDir)
if os.IsNotExist(err) {
os.MkdirAll(option.DefaultLevelDbDir, 0755)
}
glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir)
} else {
glog.Warningf("skipping default store dir in %s", option.DefaultLevelDbDir)
}
util.LoadConfiguration("notification", false)
v.SetDefault("filer.options.max_file_name_length", 255)
maxFilenameLength := v.GetUint32("filer.options.max_file_name_length")
glog.V(0).Infof("max_file_name_length %d", maxFilenameLength)
fs.filer = filer.NewFiler(*option.Masters, fs.grpcDialOption, option.Host, option.FilerGroup, option.Collection, option.DefaultReplication, option.DataCenter, maxFilenameLength, func() {
if atomic.LoadInt64(&fs.listenersWaits) > 0 {
fs.listenersCond.Broadcast()
}
})
fs.filer.Cipher = option.Cipher
// we do not support IP whitelist right now https://github.com/seaweedfs/seaweedfs/issues/7094
if v.GetString("guard.white_list") != "" {
glog.Warningf("filer: guard.white_list is configured but the IP whitelist feature is currently disabled. See https://github.com/seaweedfs/seaweedfs/issues/7094")
}
fs.filerGuard = security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
fs.volumeGuard = security.NewGuard([]string{}, volumeSigningKey, volumeExpiresAfterSec, volumeReadSigningKey, volumeReadExpiresAfterSec)
fs.checkWithMaster()
go stats.LoopPushingMetric("filer", string(fs.option.Host), fs.metricsAddress, fs.metricsIntervalSec)
go fs.filer.MasterClient.KeepConnectedToMaster(context.Background())
fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
v.SetDefault("filer.options.buckets_folder", "/buckets")
fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder")
// TODO deprecated, will be removed after 2020-12-31
// replaced by https://github.com/seaweedfs/seaweedfs/wiki/Path-Specific-Configuration
// fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
isFresh := fs.filer.LoadConfiguration(v)
notification.LoadConfiguration(v, "notification.")
handleStaticResources(defaultMux)
if !option.DisableHttp {
defaultMux.HandleFunc("/healthz", requestIDMiddleware(fs.filerHealthzHandler))
// TUS resumable upload protocol handler
if option.TusBasePath != "" {
// Normalize TusPath to always have a leading slash and no trailing slash
if !strings.HasPrefix(option.TusBasePath, "/") {
option.TusBasePath = "/" + option.TusBasePath
}
option.TusBasePath = strings.TrimRight(option.TusBasePath, "/")
// Disallow using "/" as TUS base to avoid hijacking all filer routes
if option.TusBasePath == "" {
glog.Warningf("Invalid TUS base path; TUS disabled (must not be root '/')")
} else {
handlePath := option.TusBasePath + "/"
defaultMux.HandleFunc(handlePath, fs.filerGuard.WhiteList(requestIDMiddleware(fs.tusHandler)))
// Start background cleanup of expired TUS sessions (every hour)
fs.StartTusSessionCleanup(1 * time.Hour)
}
}
defaultMux.HandleFunc("/", fs.filerGuard.WhiteList(requestIDMiddleware(fs.filerHandler)))
}
if defaultMux != readonlyMux {
handleStaticResources(readonlyMux)
readonlyMux.HandleFunc("/healthz", requestIDMiddleware(fs.filerHealthzHandler))
readonlyMux.HandleFunc("/", fs.filerGuard.WhiteList(requestIDMiddleware(fs.readonlyFilerHandler)))
}
existingNodes := fs.filer.ListExistingPeerUpdates(context.Background())
startFromTime := time.Now().Add(-filer.LogFlushInterval)
if isFresh {
glog.V(0).Infof("%s bootstrap from peers %+v", option.Host, existingNodes)
if err := fs.filer.MaybeBootstrapFromOnePeer(option.Host, existingNodes, startFromTime); err != nil {
glog.Fatalf("%s bootstrap from %+v: %v", option.Host, existingNodes, err)
}
}
fs.filer.AggregateFromPeers(option.Host, existingNodes, startFromTime)
fs.filer.LoadFilerConf()
fs.filer.LoadRemoteStorageConfAndMapping()
grace.OnReload(fs.Reload)
grace.OnInterrupt(func() {
fs.filer.Shutdown()
})
fs.filer.Dlm.LockRing.SetTakeSnapshotCallback(fs.OnDlmChangeSnapshot)
return fs, nil
}
func (fs *FilerServer) checkWithMaster() {
isConnected := false
for !isConnected {
fs.option.Masters.RefreshBySrvIfAvailable()
for _, master := range fs.option.Masters.GetInstances() {
readErr := operation.WithMasterServerClient(false, master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
if err != nil {
return fmt.Errorf("get master %s configuration: %v", master, err)
}
fs.metricsAddress, fs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
return nil
})
if readErr == nil {
isConnected = true
} else {
time.Sleep(7 * time.Second)
}
}
}
}
func (fs *FilerServer) Reload() {
glog.V(0).Infoln("Reload filer server...")
util.LoadConfiguration("security", false)
}