filer: improve FoundationDB performance by disabling batch by default (#7770)
* filer: improve FoundationDB performance by disabling batch by default This PR addresses a performance issue where FoundationDB filer was achieving only ~757 ops/sec with 12 concurrent S3 clients, despite FDB being capable of 17,000+ ops/sec. Root cause: The write batcher was waiting up to 5ms for each operation to batch, even though S3 semantics require waiting for durability confirmation. This added artificial latency that defeated the purpose of batching. Changes: - Disable write batching by default (batch_enabled = false) - Each write now commits immediately in its own transaction - Reduce batch interval from 5ms to 1ms when batching is enabled - Add batch_enabled config option to toggle behavior - Improve batcher to collect available ops without blocking - Add benchmarks comparing batch vs no-batch performance Benchmark results (16 concurrent goroutines): - With batch: 2,924 ops/sec (342,032 ns/op) - Without batch: 4,625 ops/sec (216,219 ns/op) - Improvement: +58% faster Configuration: - Default: batch_enabled = false (optimal for S3 PUT latency) - For bulk ingestion: set batch_enabled = true Also fixes ARM64 Docker test setup (shell compatibility, fdbserver path). * fix: address review comments - use atomic counter and remove duplicate batcher - Use sync/atomic.Uint64 for unique filenames in concurrent benchmarks - Remove duplicate batcher creation in createBenchmarkStoreWithBatching (initialize() already creates batcher when batchEnabled=true) * fix: add realistic default values to benchmark store helper Set directoryPrefix, timeout, and maxRetryDelay to reasonable defaults for more realistic benchmark conditions.
This commit is contained in:
@@ -45,8 +45,12 @@ const (
|
||||
MAX_DIRECTORY_LIST_LIMIT = 1000
|
||||
|
||||
// Write batching defaults
|
||||
// Note: Batching is disabled by default because S3 semantics require waiting
|
||||
// for durability, and the batch timer adds latency to each operation.
|
||||
// Enable batching only for workloads that can tolerate potential latency.
|
||||
DEFAULT_BATCH_SIZE = 100
|
||||
DEFAULT_BATCH_INTERVAL = 5 * time.Millisecond
|
||||
DEFAULT_BATCH_INTERVAL = 1 * time.Millisecond
|
||||
DEFAULT_BATCH_ENABLED = false
|
||||
)
|
||||
|
||||
func init() {
|
||||
@@ -129,11 +133,33 @@ func (b *writeBatcher) run() {
|
||||
timer.Reset(b.interval)
|
||||
}
|
||||
|
||||
// Collect available ops without blocking
|
||||
collectAvailable := func() {
|
||||
for {
|
||||
select {
|
||||
case op := <-b.ops:
|
||||
batch = append(batch, op)
|
||||
batchBytes += op.size()
|
||||
if len(batch) >= b.size || batchBytes >= FDB_BATCH_SIZE_LIMIT {
|
||||
return
|
||||
}
|
||||
default:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case op := <-b.ops:
|
||||
batch = append(batch, op)
|
||||
batchBytes += op.size()
|
||||
|
||||
// Optimization: When an operation arrives, try to collect more
|
||||
// available operations without blocking. This improves throughput
|
||||
// when multiple goroutines are submitting concurrently.
|
||||
collectAvailable()
|
||||
|
||||
// Flush when batch count or size limit is reached
|
||||
if len(batch) >= b.size || batchBytes >= FDB_BATCH_SIZE_LIMIT {
|
||||
flush()
|
||||
@@ -185,8 +211,10 @@ type FoundationDBStore struct {
|
||||
directoryPrefix string
|
||||
timeout time.Duration
|
||||
maxRetryDelay time.Duration
|
||||
// Write batching
|
||||
// Write batching - disabled by default for optimal S3 latency
|
||||
// Enable for high-throughput bulk ingestion workloads
|
||||
batcher *writeBatcher
|
||||
batchEnabled bool
|
||||
batchSize int
|
||||
batchInterval time.Duration
|
||||
}
|
||||
@@ -225,6 +253,10 @@ func (store *FoundationDBStore) Initialize(configuration util.Configuration, pre
|
||||
configuration.SetDefault(prefix+"timeout", "5s")
|
||||
configuration.SetDefault(prefix+"max_retry_delay", "1s")
|
||||
configuration.SetDefault(prefix+"directory_prefix", "seaweedfs")
|
||||
// Batching is disabled by default - each write commits immediately.
|
||||
// This provides optimal latency for S3 PUT operations.
|
||||
// Enable batching for high-throughput bulk ingestion workloads.
|
||||
configuration.SetDefault(prefix+"batch_enabled", DEFAULT_BATCH_ENABLED)
|
||||
configuration.SetDefault(prefix+"batch_size", DEFAULT_BATCH_SIZE)
|
||||
configuration.SetDefault(prefix+"batch_interval", DEFAULT_BATCH_INTERVAL.String())
|
||||
|
||||
@@ -247,6 +279,7 @@ func (store *FoundationDBStore) Initialize(configuration util.Configuration, pre
|
||||
}
|
||||
|
||||
// Parse batch configuration
|
||||
store.batchEnabled = configuration.GetBool(prefix + "batch_enabled")
|
||||
store.batchSize = configuration.GetInt(prefix + "batch_size")
|
||||
if store.batchSize <= 0 {
|
||||
store.batchSize = DEFAULT_BATCH_SIZE
|
||||
@@ -288,10 +321,16 @@ func (store *FoundationDBStore) initialize(clusterFile string, apiVersion int) e
|
||||
return fmt.Errorf("failed to create/open kv directory: %w", err)
|
||||
}
|
||||
|
||||
// Start write batcher for improved throughput
|
||||
store.batcher = newWriteBatcher(store, store.batchSize, store.batchInterval)
|
||||
glog.V(0).Infof("FoundationDB: write batching enabled (batch_size=%d, batch_interval=%v)",
|
||||
store.batchSize, store.batchInterval)
|
||||
// Conditionally start write batcher
|
||||
// Batching is disabled by default for optimal S3 latency.
|
||||
// When disabled, each write commits immediately in its own transaction.
|
||||
if store.batchEnabled {
|
||||
store.batcher = newWriteBatcher(store, store.batchSize, store.batchInterval)
|
||||
glog.V(0).Infof("FoundationDB: write batching enabled (batch_size=%d, batch_interval=%v)",
|
||||
store.batchSize, store.batchInterval)
|
||||
} else {
|
||||
glog.V(0).Infof("FoundationDB: write batching disabled (direct commit mode for optimal latency)")
|
||||
}
|
||||
|
||||
glog.V(0).Infof("FoundationDB store initialized successfully with directory prefix: %s", store.directoryPrefix)
|
||||
return nil
|
||||
|
||||
Reference in New Issue
Block a user