* filer: reduce allocations in MatchStorageRule
Optimize MatchStorageRule to avoid allocations in common cases:
- Return singleton emptyPathConf when no rules match (zero allocations)
- Return existing rule directly when only one rule matches (zero allocations)
- Only allocate and merge when multiple rules match (rare case)
Based on heap profile analysis showing 111MB allocated from 1.64M calls
to this function during 180 seconds of operation.
* filer: add fast path for getActualStore when no path-specific stores
Add hasPathSpecificStore flag to FilerStoreWrapper to skip
the MatchPrefix() call and []byte(path) conversion when no
path-specific stores are configured (the common case).
Based on heap profile analysis showing 1.39M calls to this
function during 180 seconds of operation, each requiring a
string-to-byte slice conversion for the MatchPrefix call.
* filer/foundationdb: use sync.Pool for tuple allocation in genKey
Use sync.Pool to reuse tuple.Tuple slices in genKey(), reducing
allocation overhead for every FoundationDB operation.
Based on heap profile analysis showing 102MB allocated from 1.79M
calls to genKey() during 180 seconds of operation. The Pack() call
still allocates internally, but this reduces the tuple slice
allocation overhead by ~50%.
* filer: use sync.Pool for protobuf Entry and FuseAttributes
Add pooling for filer_pb.Entry and filer_pb.FuseAttributes in
EncodeAttributesAndChunks and DecodeAttributesAndChunks to reduce
allocations during filer store operations.
Changes:
- Add pbEntryPool with pre-allocated FuseAttributes
- Add EntryAttributeToExistingPb for in-place attribute conversion
- Update ToExistingProtoEntry to reuse existing Attributes when available
Based on heap profile showing:
- EncodeAttributesAndChunks: 69.5MB cumulative
- DecodeAttributesAndChunks: 46.5MB cumulative
- EntryAttributeToPb: 47.5MB flat allocations
* log_buffer: use sync.Pool for LogEntry in readTs
Add logEntryPool to reuse filer_pb.LogEntry objects in readTs(),
which is called frequently during binary search in ReadFromBuffer.
This function only needs the TsNs field from the unmarshaled entry,
so pooling the LogEntry avoids repeated allocations.
Based on heap profile showing readTs with 188MB cumulative allocations
from timestamp lookups during log buffer reads.
* pb: reduce gRPC metadata allocations in interceptor
Optimize requestIDUnaryInterceptor and WithGrpcClient to reduce
metadata allocations on every gRPC request:
- Use AppendToOutgoingContext instead of NewOutgoingContext + New()
This avoids creating a new map[string]string for single key-value pairs
- Check FromIncomingContext return value before using metadata
Based on heap profile showing metadata operations contributing 0.45GB
(10.5%) of allocations, with requestIDUnaryInterceptor being the main
source at 0.44GB cumulative.
Expected reduction: ~0.2GB from avoiding map allocations per request.
* filer/log_buffer: address code review feedback
- Use proto.Reset() instead of manual field clearing in resetLogEntry
for more idiomatic and comprehensive state clearing
- Add resetPbEntry() call before pool return in error path for
consistency with success path in DecodeAttributesAndChunks
* log_buffer: reduce PreviousBufferCount from 32 to 4
Reduce the number of retained previous buffers from 32 to 4.
Each buffer is 8MB, so this reduces the maximum retained memory
from 256MB to 32MB for previous buffers.
Most subscribers catch up quickly, so 4 buffers (32MB) should
be sufficient while significantly reducing memory footprint.
* filer/foundationdb: use defer for tuple pool cleanup in genKey
Refactor genKey to use defer for returning the pooled tuple.
This ensures the pooled object is always returned even if
store.seaweedfsDir.Pack panics, making the code more robust.
Also simplifies the code by removing the temporary variable.
* filer: early-stop MatchStorageRule prescan after 2 matches
Stop the prescan callback after finding 2 matches since we only
need to know if there are 0, 1, or multiple matches. This avoids
unnecessarily scanning the rest of the trie when many rules exist.
* fix: address critical code review issues
filer_conf.go:
- Remove mutable singleton emptyPathConf that could corrupt shared state
- Return fresh copy for no-match case and cloned copy for single-match case
- Add clonePathConf helper to create shallow copies safely
grpc_client_server.go:
- Remove incorrect AppendToOutgoingContext call in server interceptor
(that API is for outbound client calls, not server-side handlers)
- Rely on request_id.Set and SetTrailer for request ID propagation
* fix: treat FilerConf_PathConf as immutable
Fix callers that were incorrectly mutating the returned PathConf:
- filer_server_handlers_write.go: Use local variable for MaxFileNameLength
instead of mutating the shared rule
- command_s3_bucket_quota_check.go: Create new PathConf explicitly when
modifying config instead of mutating the returned one
This allows MatchStorageRule to safely return the singleton or direct
references without copying, restoring the memory optimization.
Callers must NOT mutate the returned *FilerConf_PathConf.
* filer: add ClonePathConf helper for creating mutable copies
Add reusable ClonePathConf function that creates a mutable copy of
a PathConf. This is useful when callers need to modify config before
calling SetLocationConf.
Update command_s3_bucket_quota_check.go to use the new helper.
Also fix redundant return statement in DeleteLocationConf.
* fmt
* filer: fix protobuf pool reset to clear internal fields
Address code review feedback:
1. resetPbEntry/resetFuseAttributes: Use struct assignment (*e = T{})
instead of field-by-field reset to clear protobuf internal fields
(unknownFields, sizeCache) that would otherwise accumulate across
pool reuses, causing data corruption or memory bloat.
2. EntryAttributeToExistingPb: Add nil guard for attr parameter to
prevent panic if caller passes nil.
* log_buffer: reset logEntry before pool return in error path
For consistency with success path, reset the logEntry before putting
it back in the pool in the error path. This prevents the pooled object
from holding references to partially unmarshaled data.
* filer: optimize MatchStorageRule and document ClonePathConf
1. Avoid double []byte(path) conversion in multi-match case by
converting once and reusing pathBytes.
2. Add IMPORTANT comment to ClonePathConf documenting that it must
be kept in sync with filer_pb.FilerConf_PathConf fields when
the protobuf evolves.
* filer/log_buffer: fix data race and use defer for pool cleanup
1. entry_codec.go EncodeAttributesAndChunks: Fix critical data race -
proto.Marshal may return a slice sharing memory with the message.
Copy the data before returning message to pool to prevent corruption.
2. entry_codec.go DecodeAttributesAndChunks: Use defer for cleaner
pool management, ensuring message is always returned to pool.
3. log_buffer.go readTs: Use defer for pool cleanup, removing
duplicated resetLogEntry/Put calls in success and error paths.
* filer: fix ClonePathConf field order and add comprehensive test
1. Fix field order in ClonePathConf to match protobuf struct definition
(WormGracePeriodSeconds before WormRetentionTimeSeconds).
2. Add TestClonePathConf that constructs a fully-populated PathConf,
calls ClonePathConf, and asserts equality of all exported fields.
This will catch future schema drift when new fields are added.
3. Add TestClonePathConfNil to verify nil handling.
* filer: use reflection in ClonePathConf test to detect schema drift
Replace hardcoded field comparisons with reflection-based comparison.
This automatically catches:
1. New fields added to the protobuf but not copied in ClonePathConf
2. Missing non-zero test values for any exported field
The test iterates over all exported fields using reflect and compares
src vs clone values, failing if any field differs.
* filer: update EntryAttributeToExistingPb comment to reflect nil handling
The function safely handles nil attr by returning early, but the comment
incorrectly stated 'attr must not be nil'. Update comment to accurately
describe the defensive behavior.
* Fix review feedback: restore request ID propagation and remove redundant resets
1. grpc_client_server.go: Restore AppendToOutgoingContext for request ID
so handlers making downstream gRPC calls will automatically propagate
the request ID to downstream services.
2. entry_codec.go: Remove redundant resetPbEntry calls after Get.
The defer block ensures reset before Put, so next Get receives clean object.
3. log_buffer.go: Remove redundant resetLogEntry call after Get for
same reason - defer already handles reset before Put.
771 lines
23 KiB
Go
771 lines
23 KiB
Go
//go:build foundationdb
|
|
// +build foundationdb
|
|
|
|
// Package foundationdb provides a filer store implementation using FoundationDB as the backend.
|
|
//
|
|
// IMPORTANT DESIGN NOTE - DeleteFolderChildren and Transaction Limits:
|
|
//
|
|
// FoundationDB imposes strict transaction limits:
|
|
// - Maximum transaction size: 10MB
|
|
// - Maximum transaction duration: 5 seconds
|
|
//
|
|
// The DeleteFolderChildren operation always uses batched deletion with multiple small transactions
|
|
// to safely handle directories of any size. Even if called within an existing transaction context,
|
|
// it will create its own batch transactions to avoid exceeding FDB limits.
|
|
//
|
|
// This means DeleteFolderChildren is NOT atomic with respect to an outer transaction - it manages
|
|
// its own transaction boundaries for safety and reliability.
|
|
|
|
package foundationdb
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/apple/foundationdb/bindings/go/src/fdb"
|
|
"github.com/apple/foundationdb/bindings/go/src/fdb/directory"
|
|
"github.com/apple/foundationdb/bindings/go/src/fdb/tuple"
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/filer"
|
|
"github.com/seaweedfs/seaweedfs/weed/glog"
|
|
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
|
|
"github.com/seaweedfs/seaweedfs/weed/util"
|
|
)
|
|
|
|
const (
|
|
// FoundationDB transaction size limit is 10MB
|
|
FDB_TRANSACTION_SIZE_LIMIT = 10 * 1024 * 1024
|
|
// Safe limit for batch size (leave margin for FDB overhead)
|
|
FDB_BATCH_SIZE_LIMIT = 8 * 1024 * 1024
|
|
// Maximum number of entries to return in a single directory listing
|
|
// Large batches can cause transaction timeouts and increase memory pressure
|
|
MAX_DIRECTORY_LIST_LIMIT = 1000
|
|
|
|
// Write batching defaults
|
|
DEFAULT_BATCH_SIZE = 100
|
|
DEFAULT_BATCH_INTERVAL = 5 * time.Millisecond
|
|
)
|
|
|
|
func init() {
|
|
filer.Stores = append(filer.Stores, &FoundationDBStore{})
|
|
}
|
|
|
|
// writeOp represents a pending write operation
|
|
type writeOp struct {
|
|
key fdb.Key
|
|
value []byte // nil for delete
|
|
done chan error
|
|
}
|
|
|
|
// opSize returns the approximate size of an operation in bytes
|
|
func (op *writeOp) size() int {
|
|
return len(op.key) + len(op.value)
|
|
}
|
|
|
|
// writeBatcher batches multiple writes into single transactions
|
|
type writeBatcher struct {
|
|
store *FoundationDBStore
|
|
ops chan *writeOp
|
|
stop chan struct{}
|
|
wg sync.WaitGroup
|
|
size int
|
|
interval time.Duration
|
|
}
|
|
|
|
func newWriteBatcher(store *FoundationDBStore, size int, interval time.Duration) *writeBatcher {
|
|
b := &writeBatcher{
|
|
store: store,
|
|
ops: make(chan *writeOp, size*10),
|
|
stop: make(chan struct{}),
|
|
size: size,
|
|
interval: interval,
|
|
}
|
|
b.wg.Add(1)
|
|
go b.run()
|
|
return b
|
|
}
|
|
|
|
func (b *writeBatcher) run() {
|
|
defer b.wg.Done()
|
|
batch := make([]*writeOp, 0, b.size)
|
|
batchBytes := 0 // Track cumulative size of batch
|
|
timer := time.NewTimer(b.interval)
|
|
defer timer.Stop()
|
|
|
|
flush := func() {
|
|
if len(batch) == 0 {
|
|
return
|
|
}
|
|
_, err := b.store.database.Transact(func(tr fdb.Transaction) (interface{}, error) {
|
|
for _, op := range batch {
|
|
if op.value != nil {
|
|
tr.Set(op.key, op.value)
|
|
} else {
|
|
tr.Clear(op.key)
|
|
}
|
|
}
|
|
return nil, nil
|
|
})
|
|
for _, op := range batch {
|
|
if op.done != nil {
|
|
op.done <- err
|
|
close(op.done)
|
|
}
|
|
}
|
|
batch = batch[:0]
|
|
batchBytes = 0
|
|
}
|
|
|
|
resetTimer := func() {
|
|
if !timer.Stop() {
|
|
select {
|
|
case <-timer.C:
|
|
default:
|
|
}
|
|
}
|
|
timer.Reset(b.interval)
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case op := <-b.ops:
|
|
batch = append(batch, op)
|
|
batchBytes += op.size()
|
|
// Flush when batch count or size limit is reached
|
|
if len(batch) >= b.size || batchBytes >= FDB_BATCH_SIZE_LIMIT {
|
|
flush()
|
|
resetTimer()
|
|
}
|
|
case <-timer.C:
|
|
flush()
|
|
// Timer already fired, safe to reset directly
|
|
timer.Reset(b.interval)
|
|
case <-b.stop:
|
|
for {
|
|
select {
|
|
case op := <-b.ops:
|
|
batch = append(batch, op)
|
|
default:
|
|
flush()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *writeBatcher) submit(key fdb.Key, value []byte, wait bool) error {
|
|
op := &writeOp{key: key, value: value}
|
|
if wait {
|
|
op.done = make(chan error, 1)
|
|
}
|
|
select {
|
|
case b.ops <- op:
|
|
if wait {
|
|
return <-op.done
|
|
}
|
|
return nil
|
|
case <-b.stop:
|
|
return fmt.Errorf("batcher stopped")
|
|
}
|
|
}
|
|
|
|
func (b *writeBatcher) shutdown() {
|
|
close(b.stop)
|
|
b.wg.Wait()
|
|
}
|
|
|
|
type FoundationDBStore struct {
|
|
database fdb.Database
|
|
seaweedfsDir directory.DirectorySubspace
|
|
kvDir directory.DirectorySubspace
|
|
directoryPrefix string
|
|
timeout time.Duration
|
|
maxRetryDelay time.Duration
|
|
// Write batching
|
|
batcher *writeBatcher
|
|
batchSize int
|
|
batchInterval time.Duration
|
|
}
|
|
|
|
// Context key type for storing transactions
|
|
type contextKey string
|
|
|
|
const transactionKey contextKey = "fdb_transaction"
|
|
|
|
// Helper functions for context-scoped transactions
|
|
func (store *FoundationDBStore) getTransactionFromContext(ctx context.Context) (fdb.Transaction, bool) {
|
|
val := ctx.Value(transactionKey)
|
|
if val == nil {
|
|
var emptyTx fdb.Transaction
|
|
return emptyTx, false
|
|
}
|
|
if tx, ok := val.(fdb.Transaction); ok {
|
|
return tx, true
|
|
}
|
|
var emptyTx fdb.Transaction
|
|
return emptyTx, false
|
|
}
|
|
|
|
func (store *FoundationDBStore) setTransactionInContext(ctx context.Context, tx fdb.Transaction) context.Context {
|
|
return context.WithValue(ctx, transactionKey, tx)
|
|
}
|
|
|
|
func (store *FoundationDBStore) GetName() string {
|
|
return "foundationdb"
|
|
}
|
|
|
|
func (store *FoundationDBStore) Initialize(configuration util.Configuration, prefix string) error {
|
|
// Set default configuration values
|
|
configuration.SetDefault(prefix+"cluster_file", "/etc/foundationdb/fdb.cluster")
|
|
configuration.SetDefault(prefix+"api_version", 740)
|
|
configuration.SetDefault(prefix+"timeout", "5s")
|
|
configuration.SetDefault(prefix+"max_retry_delay", "1s")
|
|
configuration.SetDefault(prefix+"directory_prefix", "seaweedfs")
|
|
configuration.SetDefault(prefix+"batch_size", DEFAULT_BATCH_SIZE)
|
|
configuration.SetDefault(prefix+"batch_interval", DEFAULT_BATCH_INTERVAL.String())
|
|
|
|
clusterFile := configuration.GetString(prefix + "cluster_file")
|
|
apiVersion := configuration.GetInt(prefix + "api_version")
|
|
timeoutStr := configuration.GetString(prefix + "timeout")
|
|
maxRetryDelayStr := configuration.GetString(prefix + "max_retry_delay")
|
|
store.directoryPrefix = configuration.GetString(prefix + "directory_prefix")
|
|
|
|
// Parse timeout values
|
|
var err error
|
|
store.timeout, err = time.ParseDuration(timeoutStr)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid timeout duration %s: %w", timeoutStr, err)
|
|
}
|
|
|
|
store.maxRetryDelay, err = time.ParseDuration(maxRetryDelayStr)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid max_retry_delay duration %s: %w", maxRetryDelayStr, err)
|
|
}
|
|
|
|
// Parse batch configuration
|
|
store.batchSize = configuration.GetInt(prefix + "batch_size")
|
|
if store.batchSize <= 0 {
|
|
store.batchSize = DEFAULT_BATCH_SIZE
|
|
}
|
|
batchIntervalStr := configuration.GetString(prefix + "batch_interval")
|
|
store.batchInterval, err = time.ParseDuration(batchIntervalStr)
|
|
if err != nil {
|
|
glog.Warningf("invalid %sbatch_interval duration %q, using default %v: %v", prefix, batchIntervalStr, DEFAULT_BATCH_INTERVAL, err)
|
|
store.batchInterval = DEFAULT_BATCH_INTERVAL
|
|
}
|
|
|
|
return store.initialize(clusterFile, apiVersion)
|
|
}
|
|
|
|
func (store *FoundationDBStore) initialize(clusterFile string, apiVersion int) error {
|
|
glog.V(0).Infof("FoundationDB: connecting to cluster file: %s, API version: %d", clusterFile, apiVersion)
|
|
|
|
// Set FDB API version
|
|
if err := fdb.APIVersion(apiVersion); err != nil {
|
|
return fmt.Errorf("failed to set FoundationDB API version %d: %w", apiVersion, err)
|
|
}
|
|
|
|
// Open database
|
|
var err error
|
|
store.database, err = fdb.OpenDatabase(clusterFile)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to open FoundationDB database: %w", err)
|
|
}
|
|
|
|
// Create/open seaweedfs directory
|
|
store.seaweedfsDir, err = directory.CreateOrOpen(store.database, []string{store.directoryPrefix}, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create/open seaweedfs directory: %w", err)
|
|
}
|
|
|
|
// Create/open kv subdirectory for key-value operations
|
|
store.kvDir, err = directory.CreateOrOpen(store.database, []string{store.directoryPrefix, "kv"}, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create/open kv directory: %w", err)
|
|
}
|
|
|
|
// Start write batcher for improved throughput
|
|
store.batcher = newWriteBatcher(store, store.batchSize, store.batchInterval)
|
|
glog.V(0).Infof("FoundationDB: write batching enabled (batch_size=%d, batch_interval=%v)",
|
|
store.batchSize, store.batchInterval)
|
|
|
|
glog.V(0).Infof("FoundationDB store initialized successfully with directory prefix: %s", store.directoryPrefix)
|
|
return nil
|
|
}
|
|
|
|
func (store *FoundationDBStore) BeginTransaction(ctx context.Context) (context.Context, error) {
|
|
// Check if there's already a transaction in this context
|
|
if _, exists := store.getTransactionFromContext(ctx); exists {
|
|
return ctx, fmt.Errorf("transaction already in progress for this context")
|
|
}
|
|
|
|
// Create a new transaction
|
|
tx, err := store.database.CreateTransaction()
|
|
if err != nil {
|
|
return ctx, fmt.Errorf("failed to create transaction: %w", err)
|
|
}
|
|
|
|
// Store the transaction in context and return the new context
|
|
newCtx := store.setTransactionInContext(ctx, tx)
|
|
return newCtx, nil
|
|
}
|
|
|
|
func (store *FoundationDBStore) CommitTransaction(ctx context.Context) error {
|
|
// Get transaction from context
|
|
tx, exists := store.getTransactionFromContext(ctx)
|
|
if !exists {
|
|
return fmt.Errorf("no transaction in progress for this context")
|
|
}
|
|
|
|
// Commit the transaction
|
|
err := tx.Commit().Get()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to commit transaction: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (store *FoundationDBStore) RollbackTransaction(ctx context.Context) error {
|
|
// Get transaction from context
|
|
tx, exists := store.getTransactionFromContext(ctx)
|
|
if !exists {
|
|
return fmt.Errorf("no transaction in progress for this context")
|
|
}
|
|
|
|
// Cancel the transaction
|
|
tx.Cancel()
|
|
return nil
|
|
}
|
|
|
|
func (store *FoundationDBStore) InsertEntry(ctx context.Context, entry *filer.Entry) error {
|
|
return store.UpdateEntry(ctx, entry)
|
|
}
|
|
|
|
func (store *FoundationDBStore) UpdateEntry(ctx context.Context, entry *filer.Entry) error {
|
|
key := store.genKey(entry.DirAndName())
|
|
|
|
value, err := entry.EncodeAttributesAndChunks()
|
|
if err != nil {
|
|
return fmt.Errorf("encoding %s %+v: %w", entry.FullPath, entry.Attr, err)
|
|
}
|
|
|
|
if len(entry.GetChunks()) > filer.CountEntryChunksForGzip {
|
|
value = util.MaybeGzipData(value)
|
|
}
|
|
|
|
// Check transaction size limit
|
|
if len(value) > FDB_TRANSACTION_SIZE_LIMIT {
|
|
return fmt.Errorf("entry %s exceeds FoundationDB transaction size limit (%d > %d bytes)",
|
|
entry.FullPath, len(value), FDB_TRANSACTION_SIZE_LIMIT)
|
|
}
|
|
|
|
// Check if there's a transaction in context
|
|
if tx, exists := store.getTransactionFromContext(ctx); exists {
|
|
tx.Set(key, value)
|
|
return nil
|
|
}
|
|
|
|
// Use write batcher for better throughput
|
|
if store.batcher != nil {
|
|
return store.batcher.submit(key, value, true)
|
|
}
|
|
|
|
// Fallback: execute in a new transaction
|
|
_, err = store.database.Transact(func(tr fdb.Transaction) (interface{}, error) {
|
|
tr.Set(key, value)
|
|
return nil, nil
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("persisting %s: %w", entry.FullPath, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (store *FoundationDBStore) FindEntry(ctx context.Context, fullpath util.FullPath) (entry *filer.Entry, err error) {
|
|
key := store.genKey(util.FullPath(fullpath).DirAndName())
|
|
|
|
var data []byte
|
|
// Check if there's a transaction in context
|
|
if tx, exists := store.getTransactionFromContext(ctx); exists {
|
|
data, err = tx.Get(key).Get()
|
|
} else {
|
|
var result interface{}
|
|
result, err = store.database.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) {
|
|
return rtr.Get(key).Get()
|
|
})
|
|
if err == nil {
|
|
if resultBytes, ok := result.([]byte); ok {
|
|
data = resultBytes
|
|
}
|
|
}
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("find entry %s: %w", fullpath, err)
|
|
}
|
|
|
|
if data == nil {
|
|
return nil, filer_pb.ErrNotFound
|
|
}
|
|
|
|
entry = &filer.Entry{
|
|
FullPath: fullpath,
|
|
}
|
|
|
|
err = entry.DecodeAttributesAndChunks(util.MaybeDecompressData(data))
|
|
if err != nil {
|
|
return entry, fmt.Errorf("decode %s : %w", entry.FullPath, err)
|
|
}
|
|
|
|
return entry, nil
|
|
}
|
|
|
|
func (store *FoundationDBStore) DeleteEntry(ctx context.Context, fullpath util.FullPath) error {
|
|
key := store.genKey(util.FullPath(fullpath).DirAndName())
|
|
|
|
// Check if there's a transaction in context
|
|
if tx, exists := store.getTransactionFromContext(ctx); exists {
|
|
tx.Clear(key)
|
|
return nil
|
|
}
|
|
|
|
// Use write batcher for better throughput (nil value = delete)
|
|
if store.batcher != nil {
|
|
return store.batcher.submit(key, nil, true)
|
|
}
|
|
|
|
// Fallback: execute in a new transaction
|
|
_, err := store.database.Transact(func(tr fdb.Transaction) (interface{}, error) {
|
|
tr.Clear(key)
|
|
return nil, nil
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("deleting %s: %w", fullpath, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (store *FoundationDBStore) DeleteFolderChildren(ctx context.Context, fullpath util.FullPath) error {
|
|
// Recursively delete all entries in this directory and its subdirectories
|
|
// We need recursion because our key structure is tuple{dirPath, fileName}
|
|
// not tuple{dirPath, ...pathComponents}, so a simple prefix range won't catch subdirectories
|
|
|
|
// ALWAYS use batched deletion to safely handle directories of any size.
|
|
// This avoids FoundationDB's 10MB transaction size and 5s timeout limits.
|
|
//
|
|
// Note: Even if called within an existing transaction, we create our own batch transactions.
|
|
// This means DeleteFolderChildren is NOT atomic with an outer transaction, but it ensures
|
|
// reliability and prevents transaction limit violations.
|
|
return store.deleteFolderChildrenInBatches(ctx, fullpath)
|
|
}
|
|
|
|
// deleteFolderChildrenInBatches deletes directory contents in multiple transactions
|
|
// to avoid hitting FoundationDB's transaction size (10MB) and time (5s) limits
|
|
func (store *FoundationDBStore) deleteFolderChildrenInBatches(ctx context.Context, fullpath util.FullPath) error {
|
|
const BATCH_SIZE = 100 // Delete up to 100 entries per transaction
|
|
|
|
// Ensure listing and recursion run outside of any ambient transaction
|
|
// Store a sentinel nil value so getTransactionFromContext returns false
|
|
ctxNoTxn := context.WithValue(ctx, transactionKey, (*struct{})(nil))
|
|
|
|
for {
|
|
// Collect one batch of entries
|
|
var entriesToDelete []util.FullPath
|
|
var subDirectories []util.FullPath
|
|
|
|
// List entries - we'll process BATCH_SIZE at a time
|
|
_, err := store.ListDirectoryEntries(ctxNoTxn, fullpath, "", true, int64(BATCH_SIZE), func(entry *filer.Entry) (bool, error) {
|
|
entriesToDelete = append(entriesToDelete, entry.FullPath)
|
|
if entry.IsDirectory() {
|
|
subDirectories = append(subDirectories, entry.FullPath)
|
|
}
|
|
return true, nil
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("listing children of %s: %w", fullpath, err)
|
|
}
|
|
|
|
// If no entries found, we're done
|
|
if len(entriesToDelete) == 0 {
|
|
break
|
|
}
|
|
|
|
// Recursively delete subdirectories first (also in batches)
|
|
for _, subDir := range subDirectories {
|
|
if err := store.deleteFolderChildrenInBatches(ctxNoTxn, subDir); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Delete this batch of entries in a single transaction
|
|
_, err = store.database.Transact(func(tr fdb.Transaction) (interface{}, error) {
|
|
txCtx := store.setTransactionInContext(context.Background(), tr)
|
|
for _, entryPath := range entriesToDelete {
|
|
if delErr := store.DeleteEntry(txCtx, entryPath); delErr != nil {
|
|
return nil, fmt.Errorf("deleting entry %s: %w", entryPath, delErr)
|
|
}
|
|
}
|
|
return nil, nil
|
|
})
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// If we got fewer entries than BATCH_SIZE, we're done with this directory
|
|
if len(entriesToDelete) < BATCH_SIZE {
|
|
break
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (store *FoundationDBStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
|
|
return store.ListDirectoryPrefixedEntries(ctx, dirPath, startFileName, includeStartFile, limit, "", eachEntryFunc)
|
|
}
|
|
|
|
func (store *FoundationDBStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc filer.ListEachEntryFunc) (lastFileName string, err error) {
|
|
// Cap limit for optimal FoundationDB performance
|
|
// Large batches can cause transaction timeouts and increase memory pressure
|
|
if limit > MAX_DIRECTORY_LIST_LIMIT || limit <= 0 {
|
|
limit = MAX_DIRECTORY_LIST_LIMIT
|
|
}
|
|
|
|
// Get the range for the entire directory first
|
|
dirTuple := tuple.Tuple{string(dirPath)}
|
|
dirRange, err := fdb.PrefixRange(store.seaweedfsDir.Pack(dirTuple))
|
|
if err != nil {
|
|
return "", fmt.Errorf("creating prefix range for %s: %w", dirPath, err)
|
|
}
|
|
|
|
// Determine the key range for the scan
|
|
// Use FDB's range capabilities to only fetch keys matching the prefix
|
|
var beginKey, endKey fdb.Key
|
|
dirBeginConv, dirEndConv := dirRange.FDBRangeKeys()
|
|
dirBegin := dirBeginConv.FDBKey()
|
|
dirEnd := dirEndConv.FDBKey()
|
|
|
|
if prefix != "" {
|
|
// Build range by bracketing the filename component
|
|
// Start at Pack(dirPath, prefix) and end at Pack(dirPath, nextPrefix)
|
|
// where nextPrefix is the next lexicographic string
|
|
beginKey = store.seaweedfsDir.Pack(tuple.Tuple{string(dirPath), prefix})
|
|
endKey = dirEnd
|
|
|
|
// Use Strinc to get the next string for proper prefix range
|
|
if nextPrefix, strincErr := fdb.Strinc([]byte(prefix)); strincErr == nil {
|
|
endKey = store.seaweedfsDir.Pack(tuple.Tuple{string(dirPath), string(nextPrefix)})
|
|
}
|
|
} else {
|
|
// Use entire directory range
|
|
beginKey = dirBegin
|
|
endKey = dirEnd
|
|
}
|
|
|
|
// Determine start key and selector based on startFileName
|
|
var beginSelector fdb.KeySelector
|
|
if startFileName != "" {
|
|
// Start from the specified file
|
|
startKey := store.seaweedfsDir.Pack(tuple.Tuple{string(dirPath), startFileName})
|
|
if includeStartFile {
|
|
beginSelector = fdb.FirstGreaterOrEqual(startKey)
|
|
} else {
|
|
beginSelector = fdb.FirstGreaterThan(startKey)
|
|
}
|
|
// Ensure beginSelector is within our desired range
|
|
if bytes.Compare(beginSelector.Key.FDBKey(), beginKey.FDBKey()) < 0 {
|
|
beginSelector = fdb.FirstGreaterOrEqual(beginKey)
|
|
}
|
|
} else {
|
|
// Start from beginning of the range
|
|
beginSelector = fdb.FirstGreaterOrEqual(beginKey)
|
|
}
|
|
|
|
// End selector is the end of our calculated range
|
|
endSelector := fdb.FirstGreaterOrEqual(endKey)
|
|
|
|
var kvs []fdb.KeyValue
|
|
var rangeErr error
|
|
// Check if there's a transaction in context
|
|
if tx, exists := store.getTransactionFromContext(ctx); exists {
|
|
sr := fdb.SelectorRange{Begin: beginSelector, End: endSelector}
|
|
kvs, rangeErr = tx.GetRange(sr, fdb.RangeOptions{Limit: int(limit)}).GetSliceWithError()
|
|
if rangeErr != nil {
|
|
return "", fmt.Errorf("scanning %s: %w", dirPath, rangeErr)
|
|
}
|
|
} else {
|
|
result, err := store.database.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) {
|
|
sr := fdb.SelectorRange{Begin: beginSelector, End: endSelector}
|
|
kvSlice, err := rtr.GetRange(sr, fdb.RangeOptions{Limit: int(limit)}).GetSliceWithError()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return kvSlice, nil
|
|
})
|
|
if err != nil {
|
|
return "", fmt.Errorf("scanning %s: %w", dirPath, err)
|
|
}
|
|
var ok bool
|
|
kvs, ok = result.([]fdb.KeyValue)
|
|
if !ok {
|
|
return "", fmt.Errorf("unexpected type from ReadTransact: %T, expected []fdb.KeyValue", result)
|
|
}
|
|
}
|
|
|
|
for _, kv := range kvs {
|
|
fileName, extractErr := store.extractFileName(kv.Key)
|
|
if extractErr != nil {
|
|
glog.Warningf("list %s: failed to extract fileName from key %v: %v", dirPath, kv.Key, extractErr)
|
|
continue
|
|
}
|
|
|
|
entry := &filer.Entry{
|
|
FullPath: util.NewFullPath(string(dirPath), fileName),
|
|
}
|
|
|
|
if decodeErr := entry.DecodeAttributesAndChunks(util.MaybeDecompressData(kv.Value)); decodeErr != nil {
|
|
glog.V(0).Infof("list %s : %v", entry.FullPath, decodeErr)
|
|
continue
|
|
}
|
|
|
|
resEachEntryFunc, resEachEntryFuncErr := eachEntryFunc(entry)
|
|
if resEachEntryFuncErr != nil {
|
|
glog.ErrorfCtx(ctx, "failed to process eachEntryFunc for entry %q: %v", fileName, resEachEntryFuncErr)
|
|
return lastFileName, fmt.Errorf("failed to process eachEntryFunc for entry %q: %w", fileName, resEachEntryFuncErr)
|
|
}
|
|
if !resEachEntryFunc {
|
|
break
|
|
}
|
|
|
|
lastFileName = fileName
|
|
}
|
|
|
|
return lastFileName, nil
|
|
}
|
|
|
|
// KV operations
|
|
func (store *FoundationDBStore) KvPut(ctx context.Context, key []byte, value []byte) error {
|
|
fdbKey := store.kvDir.Pack(tuple.Tuple{key})
|
|
|
|
// Check if there's a transaction in context
|
|
if tx, exists := store.getTransactionFromContext(ctx); exists {
|
|
tx.Set(fdbKey, value)
|
|
return nil
|
|
}
|
|
|
|
_, err := store.database.Transact(func(tr fdb.Transaction) (interface{}, error) {
|
|
tr.Set(fdbKey, value)
|
|
return nil, nil
|
|
})
|
|
|
|
return err
|
|
}
|
|
|
|
func (store *FoundationDBStore) KvGet(ctx context.Context, key []byte) ([]byte, error) {
|
|
fdbKey := store.kvDir.Pack(tuple.Tuple{key})
|
|
|
|
var data []byte
|
|
var err error
|
|
|
|
// Check if there's a transaction in context
|
|
if tx, exists := store.getTransactionFromContext(ctx); exists {
|
|
data, err = tx.Get(fdbKey).Get()
|
|
} else {
|
|
var result interface{}
|
|
result, err = store.database.ReadTransact(func(rtr fdb.ReadTransaction) (interface{}, error) {
|
|
return rtr.Get(fdbKey).Get()
|
|
})
|
|
if err == nil {
|
|
if resultBytes, ok := result.([]byte); ok {
|
|
data = resultBytes
|
|
}
|
|
}
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("kv get %s: %w", string(key), err)
|
|
}
|
|
if data == nil {
|
|
return nil, filer.ErrKvNotFound
|
|
}
|
|
|
|
return data, nil
|
|
}
|
|
|
|
func (store *FoundationDBStore) KvDelete(ctx context.Context, key []byte) error {
|
|
fdbKey := store.kvDir.Pack(tuple.Tuple{key})
|
|
|
|
// Check if there's a transaction in context
|
|
if tx, exists := store.getTransactionFromContext(ctx); exists {
|
|
tx.Clear(fdbKey)
|
|
return nil
|
|
}
|
|
|
|
_, err := store.database.Transact(func(tr fdb.Transaction) (interface{}, error) {
|
|
tr.Clear(fdbKey)
|
|
return nil, nil
|
|
})
|
|
|
|
return err
|
|
}
|
|
|
|
func (store *FoundationDBStore) Shutdown() {
|
|
// Stop write batcher
|
|
if store.batcher != nil {
|
|
store.batcher.shutdown()
|
|
store.batcher = nil
|
|
}
|
|
// FoundationDB doesn't have an explicit close method for Database
|
|
glog.V(0).Infof("FoundationDB store shutdown")
|
|
}
|
|
|
|
// tuplePool reduces allocations in genKey which is called on every FDB operation
|
|
var tuplePool = sync.Pool{
|
|
New: func() interface{} {
|
|
// Pre-allocate slice with capacity 2 for (dirPath, fileName)
|
|
t := make(tuple.Tuple, 2)
|
|
return &t
|
|
},
|
|
}
|
|
|
|
// Helper functions
|
|
func (store *FoundationDBStore) genKey(dirPath, fileName string) fdb.Key {
|
|
// Get a tuple from pool to avoid slice allocation
|
|
tp := tuplePool.Get().(*tuple.Tuple)
|
|
defer func() {
|
|
// Clear references before returning to pool to avoid memory leaks
|
|
(*tp)[0] = nil
|
|
(*tp)[1] = nil
|
|
tuplePool.Put(tp)
|
|
}()
|
|
(*tp)[0] = dirPath
|
|
(*tp)[1] = fileName
|
|
return store.seaweedfsDir.Pack(*tp)
|
|
}
|
|
|
|
func (store *FoundationDBStore) extractFileName(key fdb.Key) (string, error) {
|
|
t, err := store.seaweedfsDir.Unpack(key)
|
|
if err != nil {
|
|
return "", fmt.Errorf("unpack key %v: %w", key, err)
|
|
}
|
|
if len(t) != 2 {
|
|
return "", fmt.Errorf("tuple unexpected length (len=%d, expected 2) for key %v", len(t), key)
|
|
}
|
|
|
|
if fileName, ok := t[1].(string); ok {
|
|
return fileName, nil
|
|
}
|
|
return "", fmt.Errorf("second element not a string (type=%T) for key %v", t[1], key)
|
|
}
|