Files
seaweedFS/weed/util/log_buffer/log_buffer.go
Chris Lu ca84a8a713 S3: Directly read write volume servers (#7481)
* Lazy Versioning Check, Conditional SSE Entry Fetch, HEAD Request Optimization

* revert

Reverted the conditional versioning check to always check versioning status
Reverted the conditional SSE entry fetch to always fetch entry metadata
Reverted the conditional versioning check to always check versioning status
Reverted the conditional SSE entry fetch to always fetch entry metadata

* Lazy Entry Fetch for SSE, Skip Conditional Header Check

* SSE-KMS headers are present, this is not an SSE-C request (mutually exclusive)

* SSE-C is mutually exclusive with SSE-S3 and SSE-KMS

* refactor

* Removed Premature Mutual Exclusivity Check

* check for the presence of the X-Amz-Server-Side-Encryption header

* not used

* fmt

* directly read write volume servers

* HTTP Range Request Support

* set header

* md5

* copy object

* fix sse

* fmt

* implement sse

* sse continue

* fixed the suffix range bug (bytes=-N for "last N bytes")

* debug logs

* Missing PartsCount Header

* profiling

* url encoding

* test_multipart_get_part

* headers

* debug

* adjust log level

* handle part number

* Update s3api_object_handlers.go

* nil safety

* set ModifiedTsNs

* remove

* nil check

* fix sse header

* same logic as filer

* decode values

* decode ivBase64

* s3: Fix SSE decryption JWT authentication and streaming errors

Critical fix for SSE (Server-Side Encryption) test failures:

1. **JWT Authentication Bug** (Root Cause):
   - Changed from GenJwtForFilerServer to GenJwtForVolumeServer
   - S3 API now uses correct JWT when directly reading from volume servers
   - Matches filer's authentication pattern for direct volume access
   - Fixes 'unexpected EOF' and 500 errors in SSE tests

2. **Streaming Error Handling**:
   - Added error propagation in getEncryptedStreamFromVolumes goroutine
   - Use CloseWithError() to properly communicate stream failures
   - Added debug logging for streaming errors

3. **Response Header Timing**:
   - Removed premature WriteHeader(http.StatusOK) call
   - Let Go's http package write status automatically on first write
   - Prevents header lock when errors occur during streaming

4. **Enhanced SSE Decryption Debugging**:
   - Added IV/Key validation and logging for SSE-C, SSE-KMS, SSE-S3
   - Better error messages for missing or invalid encryption metadata
   - Added glog.V(2) debugging for decryption setup

This fixes SSE integration test failures where encrypted objects
could not be retrieved due to volume server authentication failures.
The JWT bug was causing volume servers to reject requests, resulting
in truncated/empty streams (EOF) or internal errors.

* s3: Fix SSE multipart upload metadata preservation

Critical fix for SSE multipart upload test failures (SSE-C and SSE-KMS):

**Root Cause - Incomplete SSE Metadata Copying**:
The old code only tried to copy 'SeaweedFSSSEKMSKey' from the first
part to the completed object. This had TWO bugs:

1. **Wrong Constant Name** (Key Mismatch Bug):
   - Storage uses: SeaweedFSSSEKMSKeyHeader = 'X-SeaweedFS-SSE-KMS-Key'
   - Old code read: SeaweedFSSSEKMSKey = 'x-seaweedfs-sse-kms-key'
   - Result: SSE-KMS metadata was NEVER copied → 500 errors

2. **Missing SSE-C and SSE-S3 Headers**:
   - SSE-C requires: IV, Algorithm, KeyMD5
   - SSE-S3 requires: encrypted key data + standard headers
   - Old code: copied nothing for SSE-C/SSE-S3 → decryption failures

**Fix - Complete SSE Header Preservation**:
Now copies ALL SSE headers from first part to completed object:

- SSE-C: SeaweedFSSSEIV, CustomerAlgorithm, CustomerKeyMD5
- SSE-KMS: SeaweedFSSSEKMSKeyHeader, AwsKmsKeyId, ServerSideEncryption
- SSE-S3: SeaweedFSSSES3Key, ServerSideEncryption

Applied consistently to all 3 code paths:
1. Versioned buckets (creates version file)
2. Suspended versioning (creates main object with null versionId)
3. Non-versioned buckets (creates main object)

**Why This Is Correct**:
The headers copied EXACTLY match what putToFiler stores during part
upload (lines 496-521 in s3api_object_handlers_put.go). This ensures
detectPrimarySSEType() can correctly identify encrypted multipart
objects and trigger inline decryption with proper metadata.

Fixes: TestSSEMultipartUploadIntegration (SSE-C and SSE-KMS subtests)

* s3: Add debug logging for versioning state diagnosis

Temporary debug logging to diagnose test_versioning_obj_plain_null_version_overwrite_suspended failure.

Added glog.V(0) logging to show:
1. setBucketVersioningStatus: when versioning status is changed
2. PutObjectHandler: what versioning state is detected (Enabled/Suspended/none)
3. PutObjectHandler: which code path is taken (putVersionedObject vs putSuspendedVersioningObject)

This will help identify if:
- The versioning status is being set correctly in bucket config
- The cache is returning stale/incorrect versioning state
- The switch statement is correctly routing to suspended vs enabled handlers

* s3: Enhanced versioning state tracing for suspended versioning diagnosis

Added comprehensive logging across the entire versioning state flow:

PutBucketVersioningHandler:
- Log requested status (Enabled/Suspended)
- Log when calling setBucketVersioningStatus
- Log success/failure of status change

setBucketVersioningStatus:
- Log bucket and status being set
- Log when config is updated
- Log completion with error code

updateBucketConfig:
- Log versioning state being written to cache
- Immediate cache verification after Set
- Log if cache verification fails

getVersioningState:
- Log bucket name and state being returned
- Log if object lock forces VersioningEnabled
- Log errors

This will reveal:
1. If PutBucketVersioning(Suspended) is reaching the handler
2. If the cache update succeeds
3. What state getVersioningState returns during PUT
4. Any cache consistency issues

Expected to show why bucket still reports 'Enabled' after 'Suspended' call.

* s3: Add SSE chunk detection debugging for multipart uploads

Added comprehensive logging to diagnose why TestSSEMultipartUploadIntegration fails:

detectPrimarySSEType now logs:
1. Total chunk count and extended header count
2. All extended headers with 'sse'/'SSE'/'encryption' in the name
3. For each chunk: index, SseType, and whether it has metadata
4. Final SSE type counts (SSE-C, SSE-KMS, SSE-S3)

This will reveal if:
- Chunks are missing SSE metadata after multipart completion
- Extended headers are copied correctly from first part
- The SSE detection logic is working correctly

Expected to show if chunks have SseType=0 (none) or proper SSE types set.

* s3: Trace SSE chunk metadata through multipart completion and retrieval

Added end-to-end logging to track SSE chunk metadata lifecycle:

**During Multipart Completion (filer_multipart.go)**:
1. Log finalParts chunks BEFORE mkFile - shows SseType and metadata
2. Log versionEntry.Chunks INSIDE mkFile callback - shows if mkFile preserves SSE info
3. Log success after mkFile completes

**During GET Retrieval (s3api_object_handlers.go)**:
1. Log retrieved entry chunks - shows SseType and metadata after retrieval
2. Log detected SSE type result

This will reveal at which point SSE chunk metadata is lost:
- If finalParts have SSE metadata but versionEntry.Chunks don't → mkFile bug
- If versionEntry.Chunks have SSE metadata but retrieved chunks don't → storage/retrieval bug
- If chunks never have SSE metadata → multipart completion SSE processing bug

Expected to show chunks with SseType=NONE during retrieval even though
they were created with proper SseType during multipart completion.

* s3: Fix SSE-C multipart IV base64 decoding bug

**Critical Bug Found**: SSE-C multipart uploads were failing because:

Root Cause:
- entry.Extended[SeaweedFSSSEIV] stores base64-encoded IV (24 bytes for 16-byte IV)
- SerializeSSECMetadata expects raw IV bytes (16 bytes)
- During multipart completion, we were passing base64 IV directly → serialization error

Error Message:
"Failed to serialize SSE-C metadata for chunk in part X: invalid IV length: expected 16 bytes, got 24"

Fix:
- Base64-decode IV before passing to SerializeSSECMetadata
- Added error handling for decode failures

Impact:
- SSE-C multipart uploads will now correctly serialize chunk metadata
- Chunks will have proper SSE metadata for decryption during GET

This fixes the SSE-C subtest of TestSSEMultipartUploadIntegration.
SSE-KMS still has a separate issue (error code 23) being investigated.

* fixes

* kms sse

* handle retry if not found in .versions folder and should read the normal object

* quick check (no retries) to see if the .versions/ directory exists

* skip retry if object is not found

* explicit update to avoid sync delay

* fix map update lock

* Remove fmt.Printf debug statements

* Fix SSE-KMS multipart base IV fallback to fail instead of regenerating

* fmt

* Fix ACL grants storage logic

* header handling

* nil handling

* range read for sse content

* test range requests for sse objects

* fmt

* unused code

* upload in chunks

* header case

* fix url

* bucket policy error vs bucket not found

* jwt handling

* fmt

* jwt in request header

* Optimize Case-Insensitive Prefix Check

* dead code

* Eliminated Unnecessary Stream Prefetch for Multipart SSE

* range sse

* sse

* refactor

* context

* fmt

* fix type

* fix SSE-C IV Mismatch

* Fix Headers Being Set After WriteHeader

* fix url parsing

* propergate sse headers

* multipart sse-s3

* aws sig v4 authen

* sse kms

* set content range

* better errors

* Update s3api_object_handlers_copy.go

* Update s3api_object_handlers.go

* Update s3api_object_handlers.go

* avoid magic number

* clean up

* Update s3api_bucket_policy_handlers.go

* fix url parsing

* context

* data and metadata both use background context

* adjust the offset

* SSE Range Request IV Calculation

* adjust logs

* IV relative to offset in each part, not the whole file

* collect logs

* offset

* fix offset

* fix url

* logs

* variable

* jwt

* Multipart ETag semantics: conditionally set object-level Md5 for single-chunk uploads only.

* sse

* adjust IV and offset

* multipart boundaries

* ensures PUT and GET operations return consistent ETags

* Metadata Header Case

* CommonPrefixes Sorting with URL Encoding

* always sort

* remove the extra PathUnescape call

* fix the multipart get part ETag

* the FileChunk is created without setting ModifiedTsNs

* Sort CommonPrefixes lexicographically to match AWS S3 behavior

* set md5 for multipart uploads

* prevents any potential data loss or corruption in the small-file inline storage path

* compiles correctly

* decryptedReader will now be properly closed after use

* Fixed URL encoding and sort order for CommonPrefixes

* Update s3api_object_handlers_list.go

* SSE-x Chunk View Decryption

* Different IV offset calculations for single-part vs multipart objects

* still too verbose in logs

* less logs

* ensure correct conversion

* fix listing

* nil check

* minor fixes

* nil check

* single character delimiter

* optimize

* range on empty object or zero-length

* correct IV based on its position within that part, not its position in the entire object

* adjust offset

* offset

Fetch FULL encrypted chunk (not just the range)
Adjust IV by PartOffset/ChunkOffset only
Decrypt full chunk
Skip in the DECRYPTED stream to reach OffsetInChunk

* look breaking

* refactor

* error on no content

* handle intra-block byte skipping

* Incomplete HTTP Response Error Handling

* multipart SSE

* Update s3api_object_handlers.go

* address comments

* less logs

* handling directory

* Optimized rejectDirectoryObjectWithoutSlash() to avoid unnecessary lookups

* Revert "handling directory"

This reverts commit 3a335f0ac33c63f51975abc63c40e5328857a74b.

* constant

* Consolidate nil entry checks in GetObjectHandler

* add range tests

* Consolidate redundant nil entry checks in HeadObjectHandler

* adjust logs

* SSE type

* large files

* large files

Reverted the plain-object range test

* ErrNoEncryptionConfig

* Fixed SSERangeReader Infinite Loop Vulnerability

* Fixed SSE-KMS Multipart ChunkReader HTTP Body Leak

* handle empty directory in S3, added PyArrow tests

* purge unused code

* Update s3_parquet_test.py

* Update requirements.txt

* According to S3 specifications, when both partNumber and Range are present, the Range should apply within the selected part's boundaries, not to the full object.

* handle errors

* errors after writing header

* https

* fix: Wait for volume assignment readiness before running Parquet tests

The test-implicit-dir-with-server test was failing with an Internal Error
because volume assignment was not ready when tests started. This fix adds
a check that attempts a volume assignment and waits for it to succeed
before proceeding with tests.

This ensures that:
1. Volume servers are registered with the master
2. Volume growth is triggered if needed
3. The system can successfully assign volumes for writes

Fixes the timeout issue where boto3 would retry 4 times and fail with
'We encountered an internal error, please try again.'

* sse tests

* store derived IV

* fix: Clean up gRPC ports between tests to prevent port conflicts

The second test (test-implicit-dir-with-server) was failing because the
volume server's gRPC port (18080 = VOLUME_PORT + 10000) was still in use
from the first test. The cleanup code only killed HTTP port processes,
not gRPC port processes.

Added cleanup for gRPC ports in all stop targets:
- Master gRPC: MASTER_PORT + 10000 (19333)
- Volume gRPC: VOLUME_PORT + 10000 (18080)
- Filer gRPC: FILER_PORT + 10000 (18888)

This ensures clean state between test runs in CI.

* add import

* address comments

* docs: Add placeholder documentation files for Parquet test suite

Added three missing documentation files referenced in test/s3/parquet/README.md:

1. TEST_COVERAGE.md - Documents 43 total test cases (17 Go unit tests,
   6 Python integration tests, 20 Python end-to-end tests)

2. FINAL_ROOT_CAUSE_ANALYSIS.md - Explains the s3fs compatibility issue
   with PyArrow, the implicit directory problem, and how the fix works

3. MINIO_DIRECTORY_HANDLING.md - Compares MinIO's directory handling
   approach with SeaweedFS's implementation

Each file contains:
- Title and overview
- Key technical details relevant to the topic
- TODO sections for future expansion

These placeholder files resolve the broken README links and provide
structure for future detailed documentation.

* clean up if metadata operation failed

* Update s3_parquet_test.py

* clean up

* Update Makefile

* Update s3_parquet_test.py

* Update Makefile

* Handle ivSkip for non-block-aligned offsets

* Update README.md

* stop volume server faster

* stop volume server in 1 second

* different IV for each chunk in SSE-S3 and SSE-KMS

* clean up if fails

* testing upload

* error propagation

* fmt

* simplify

* fix copying

* less logs

* endian

* Added marshaling error handling

* handling invalid ranges

* error handling for adding to log buffer

* fix logging

* avoid returning too quickly and ensure proper cleaning up

* Activity Tracking for Disk Reads

* Cleanup Unused Parameters

* Activity Tracking for Kafka Publishers

* Proper Test Error Reporting

* refactoring

* less logs

* less logs

* go fmt

* guard it with if entry.Attributes.TtlSec > 0 to match the pattern used elsewhere.

* Handle bucket-default encryption config errors explicitly for multipart

* consistent activity tracking

* obsolete code for s3 on filer read/write handlers

* Update weed/s3api/s3api_object_handlers_list.go

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

---------

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
2025-11-18 23:18:35 -08:00

894 lines
29 KiB
Go

package log_buffer
import (
"bytes"
"fmt"
"math"
"sync"
"sync/atomic"
"time"
"google.golang.org/protobuf/proto"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
const BufferSize = 8 * 1024 * 1024
const PreviousBufferCount = 32
// Errors that can be returned by log buffer operations
var (
// ErrBufferCorrupted indicates the log buffer contains corrupted data
ErrBufferCorrupted = fmt.Errorf("log buffer is corrupted")
)
type dataToFlush struct {
startTime time.Time
stopTime time.Time
data *bytes.Buffer
minOffset int64
maxOffset int64
done chan struct{} // Signal when flush completes
}
type EachLogEntryFuncType func(logEntry *filer_pb.LogEntry) (isDone bool, err error)
type EachLogEntryWithOffsetFuncType func(logEntry *filer_pb.LogEntry, offset int64) (isDone bool, err error)
type LogFlushFuncType func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte, minOffset, maxOffset int64)
type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error)
// DiskChunkCache caches chunks of historical data read from disk
type DiskChunkCache struct {
mu sync.RWMutex
chunks map[int64]*CachedDiskChunk // Key: chunk start offset (aligned to chunkSize)
maxChunks int // Maximum number of chunks to cache
}
// CachedDiskChunk represents a cached chunk of disk data
type CachedDiskChunk struct {
startOffset int64
endOffset int64
messages []*filer_pb.LogEntry
lastAccess time.Time
}
type LogBuffer struct {
LastFlushTsNs int64
name string
prevBuffers *SealedBuffers
buf []byte
offset int64 // Last offset in current buffer (endOffset)
bufferStartOffset int64 // First offset in current buffer
idx []int
pos int
startTime time.Time
stopTime time.Time
lastFlushDataTime time.Time
sizeBuf []byte
flushInterval time.Duration
flushFn LogFlushFuncType
ReadFromDiskFn LogReadFromDiskFuncType
notifyFn func()
// Per-subscriber notification channels for instant wake-up
subscribersMu sync.RWMutex
subscribers map[string]chan struct{} // subscriberID -> notification channel
isStopping *atomic.Bool
isAllFlushed bool
flushChan chan *dataToFlush
LastTsNs atomic.Int64
// Offset range tracking for Kafka integration
minOffset int64
maxOffset int64
hasOffsets bool
lastFlushedOffset atomic.Int64 // Highest offset that has been flushed to disk (-1 = nothing flushed yet)
lastFlushTsNs atomic.Int64 // Latest timestamp that has been flushed to disk (0 = nothing flushed yet)
// Disk chunk cache for historical data reads
diskChunkCache *DiskChunkCache
sync.RWMutex
}
func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFuncType,
readFromDiskFn LogReadFromDiskFuncType, notifyFn func()) *LogBuffer {
lb := &LogBuffer{
name: name,
prevBuffers: newSealedBuffers(PreviousBufferCount),
buf: make([]byte, BufferSize),
sizeBuf: make([]byte, 4),
flushInterval: flushInterval,
flushFn: flushFn,
ReadFromDiskFn: readFromDiskFn,
notifyFn: notifyFn,
subscribers: make(map[string]chan struct{}),
flushChan: make(chan *dataToFlush, 256),
isStopping: new(atomic.Bool),
offset: 0, // Will be initialized from existing data if available
diskChunkCache: &DiskChunkCache{
chunks: make(map[int64]*CachedDiskChunk),
maxChunks: 16, // Cache up to 16 chunks (configurable)
},
}
lb.lastFlushedOffset.Store(-1) // Nothing flushed to disk yet
go lb.loopFlush()
go lb.loopInterval()
return lb
}
// RegisterSubscriber registers a subscriber for instant notifications when data is written
// Returns a channel that will receive notifications (<1ms latency)
func (logBuffer *LogBuffer) RegisterSubscriber(subscriberID string) chan struct{} {
logBuffer.subscribersMu.Lock()
defer logBuffer.subscribersMu.Unlock()
// Check if already registered
if existingChan, exists := logBuffer.subscribers[subscriberID]; exists {
return existingChan
}
// Create buffered channel (size 1) so notifications never block
notifyChan := make(chan struct{}, 1)
logBuffer.subscribers[subscriberID] = notifyChan
return notifyChan
}
// UnregisterSubscriber removes a subscriber and closes its notification channel
func (logBuffer *LogBuffer) UnregisterSubscriber(subscriberID string) {
logBuffer.subscribersMu.Lock()
defer logBuffer.subscribersMu.Unlock()
if ch, exists := logBuffer.subscribers[subscriberID]; exists {
close(ch)
delete(logBuffer.subscribers, subscriberID)
}
}
// IsOffsetInMemory checks if the given offset is available in the in-memory buffer
// Returns true if:
// 1. Offset is newer than what's been flushed to disk (must be in memory)
// 2. Offset is in current buffer or previous buffers (may be flushed but still in memory)
// Returns false if offset is older than memory buffers (only on disk)
func (logBuffer *LogBuffer) IsOffsetInMemory(offset int64) bool {
logBuffer.RLock()
defer logBuffer.RUnlock()
// Check if we're tracking offsets at all
if !logBuffer.hasOffsets {
return false // No offsets tracked yet
}
// OPTIMIZATION: If offset is newer than what's been flushed to disk,
// it MUST be in memory (not written to disk yet)
lastFlushed := logBuffer.lastFlushedOffset.Load()
if lastFlushed >= 0 && offset > lastFlushed {
return true
}
// Check if offset is in current buffer range AND buffer has data
// (data can be both on disk AND in memory during flush window)
if offset >= logBuffer.bufferStartOffset && offset <= logBuffer.offset {
// CRITICAL: Check if buffer actually has data (pos > 0)
// After flush, pos=0 but range is still valid - data is on disk, not in memory
if logBuffer.pos > 0 {
return true
}
// Buffer is empty (just flushed) - data is on disk
return false
}
// Check if offset is in previous buffers AND they have data
for _, buf := range logBuffer.prevBuffers.buffers {
if offset >= buf.startOffset && offset <= buf.offset {
// Check if prevBuffer actually has data
if buf.size > 0 {
return true
}
// Buffer is empty (flushed) - data is on disk
return false
}
}
// Offset is older than memory buffers - only available on disk
return false
}
// notifySubscribers sends notifications to all registered subscribers
// Non-blocking: uses select with default to avoid blocking on full channels
func (logBuffer *LogBuffer) notifySubscribers() {
logBuffer.subscribersMu.RLock()
defer logBuffer.subscribersMu.RUnlock()
if len(logBuffer.subscribers) == 0 {
return // No subscribers, skip notification
}
for _, notifyChan := range logBuffer.subscribers {
select {
case notifyChan <- struct{}{}:
// Notification sent successfully
default:
// Channel full - subscriber hasn't consumed previous notification yet
// This is OK because one notification is sufficient to wake the subscriber
}
}
}
// InitializeOffsetFromExistingData initializes the offset counter from existing data on disk
// This should be called after LogBuffer creation to ensure offset continuity on restart
func (logBuffer *LogBuffer) InitializeOffsetFromExistingData(getHighestOffsetFn func() (int64, error)) error {
if getHighestOffsetFn == nil {
return nil // No initialization function provided
}
highestOffset, err := getHighestOffsetFn()
if err != nil {
return nil // Continue with offset 0 if we can't read existing data
}
if highestOffset >= 0 {
// Set the next offset to be one after the highest existing offset
nextOffset := highestOffset + 1
logBuffer.offset = nextOffset
// bufferStartOffset should match offset after initialization
// This ensures that reads for old offsets (0...highestOffset) will trigger disk reads
// New data written after this will start at nextOffset
logBuffer.bufferStartOffset = nextOffset
// CRITICAL: Track that data [0...highestOffset] is on disk
logBuffer.lastFlushedOffset.Store(highestOffset)
// Set lastFlushedTime to current time (we know data up to highestOffset is on disk)
logBuffer.lastFlushTsNs.Store(time.Now().UnixNano())
} else {
logBuffer.bufferStartOffset = 0 // Start from offset 0
// No data on disk yet
}
return nil
}
func (logBuffer *LogBuffer) AddToBuffer(message *mq_pb.DataMessage) error {
return logBuffer.AddDataToBuffer(message.Key, message.Value, message.TsNs)
}
// AddLogEntryToBuffer directly adds a LogEntry to the buffer, preserving offset information
func (logBuffer *LogBuffer) AddLogEntryToBuffer(logEntry *filer_pb.LogEntry) error {
var toFlush *dataToFlush
var marshalErr error
logBuffer.Lock()
defer func() {
logBuffer.Unlock()
if toFlush != nil {
logBuffer.flushChan <- toFlush
}
// Only notify if there was no error
if marshalErr == nil {
if logBuffer.notifyFn != nil {
logBuffer.notifyFn()
}
// Notify all registered subscribers instantly (<1ms latency)
logBuffer.notifySubscribers()
}
}()
processingTsNs := logEntry.TsNs
ts := time.Unix(0, processingTsNs)
// Handle timestamp collision inside lock (rare case)
if logBuffer.LastTsNs.Load() >= processingTsNs {
processingTsNs = logBuffer.LastTsNs.Add(1)
ts = time.Unix(0, processingTsNs)
// Re-marshal with corrected timestamp
logEntry.TsNs = processingTsNs
} else {
logBuffer.LastTsNs.Store(processingTsNs)
}
logEntryData, err := proto.Marshal(logEntry)
if err != nil {
marshalErr = fmt.Errorf("failed to marshal LogEntry: %w", err)
glog.Errorf("%v", marshalErr)
return marshalErr
}
size := len(logEntryData)
if logBuffer.pos == 0 {
logBuffer.startTime = ts
// Reset offset tracking for new buffer
logBuffer.hasOffsets = false
}
// Track offset ranges for Kafka integration
// Use >= 0 to include offset 0 (first message in a topic)
if logEntry.Offset >= 0 {
if !logBuffer.hasOffsets {
logBuffer.minOffset = logEntry.Offset
logBuffer.maxOffset = logEntry.Offset
logBuffer.hasOffsets = true
} else {
if logEntry.Offset < logBuffer.minOffset {
logBuffer.minOffset = logEntry.Offset
}
if logEntry.Offset > logBuffer.maxOffset {
logBuffer.maxOffset = logEntry.Offset
}
}
}
if logBuffer.startTime.Add(logBuffer.flushInterval).Before(ts) || len(logBuffer.buf)-logBuffer.pos < size+4 {
toFlush = logBuffer.copyToFlush()
logBuffer.startTime = ts
if len(logBuffer.buf) < size+4 {
// Validate size to prevent integer overflow in computation BEFORE allocation
const maxBufferSize = 1 << 30 // 1 GiB practical limit
// Ensure 2*size + 4 won't overflow int and stays within practical bounds
if size < 0 || size > (math.MaxInt-4)/2 || size > (maxBufferSize-4)/2 {
marshalErr = fmt.Errorf("message size %d exceeds maximum allowed size", size)
glog.Errorf("%v", marshalErr)
return marshalErr
}
// Safe to compute now that we've validated size is in valid range
newSize := 2*size + 4
logBuffer.buf = make([]byte, newSize)
}
}
logBuffer.stopTime = ts
logBuffer.idx = append(logBuffer.idx, logBuffer.pos)
util.Uint32toBytes(logBuffer.sizeBuf, uint32(size))
copy(logBuffer.buf[logBuffer.pos:logBuffer.pos+4], logBuffer.sizeBuf)
copy(logBuffer.buf[logBuffer.pos+4:logBuffer.pos+4+size], logEntryData)
logBuffer.pos += size + 4
logBuffer.offset++
return nil
}
func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processingTsNs int64) error {
// PERFORMANCE OPTIMIZATION: Pre-process expensive operations OUTSIDE the lock
var ts time.Time
if processingTsNs == 0 {
ts = time.Now()
processingTsNs = ts.UnixNano()
} else {
ts = time.Unix(0, processingTsNs)
}
logEntry := &filer_pb.LogEntry{
TsNs: processingTsNs, // Will be updated if needed
PartitionKeyHash: util.HashToInt32(partitionKey),
Data: data,
Key: partitionKey,
}
var toFlush *dataToFlush
var marshalErr error
logBuffer.Lock()
defer func() {
logBuffer.Unlock()
if toFlush != nil {
logBuffer.flushChan <- toFlush
}
// Only notify if there was no error
if marshalErr == nil {
if logBuffer.notifyFn != nil {
logBuffer.notifyFn()
}
// Notify all registered subscribers instantly (<1ms latency)
logBuffer.notifySubscribers()
}
}()
// Handle timestamp collision inside lock (rare case)
if logBuffer.LastTsNs.Load() >= processingTsNs {
processingTsNs = logBuffer.LastTsNs.Add(1)
ts = time.Unix(0, processingTsNs)
logEntry.TsNs = processingTsNs
} else {
logBuffer.LastTsNs.Store(processingTsNs)
}
// Set the offset in the LogEntry before marshaling
// This ensures the flushed data contains the correct offset information
// Note: This also enables AddToBuffer to work correctly with Kafka-style offset-based reads
logEntry.Offset = logBuffer.offset
// Marshal with correct timestamp and offset
logEntryData, err := proto.Marshal(logEntry)
if err != nil {
marshalErr = fmt.Errorf("failed to marshal LogEntry: %w", err)
glog.Errorf("%v", marshalErr)
return marshalErr
}
size := len(logEntryData)
if logBuffer.pos == 0 {
logBuffer.startTime = ts
// Reset offset tracking for new buffer
logBuffer.hasOffsets = false
}
// Track offset ranges for Kafka integration
// Track the current offset being written
if !logBuffer.hasOffsets {
logBuffer.minOffset = logBuffer.offset
logBuffer.maxOffset = logBuffer.offset
logBuffer.hasOffsets = true
} else {
if logBuffer.offset < logBuffer.minOffset {
logBuffer.minOffset = logBuffer.offset
}
if logBuffer.offset > logBuffer.maxOffset {
logBuffer.maxOffset = logBuffer.offset
}
}
if logBuffer.startTime.Add(logBuffer.flushInterval).Before(ts) || len(logBuffer.buf)-logBuffer.pos < size+4 {
toFlush = logBuffer.copyToFlush()
logBuffer.startTime = ts
if len(logBuffer.buf) < size+4 {
// Validate size to prevent integer overflow in computation BEFORE allocation
const maxBufferSize = 1 << 30 // 1 GiB practical limit
// Ensure 2*size + 4 won't overflow int and stays within practical bounds
if size < 0 || size > (math.MaxInt-4)/2 || size > (maxBufferSize-4)/2 {
marshalErr = fmt.Errorf("message size %d exceeds maximum allowed size", size)
glog.Errorf("%v", marshalErr)
return marshalErr
}
// Safe to compute now that we've validated size is in valid range
newSize := 2*size + 4
logBuffer.buf = make([]byte, newSize)
}
}
logBuffer.stopTime = ts
logBuffer.idx = append(logBuffer.idx, logBuffer.pos)
util.Uint32toBytes(logBuffer.sizeBuf, uint32(size))
copy(logBuffer.buf[logBuffer.pos:logBuffer.pos+4], logBuffer.sizeBuf)
copy(logBuffer.buf[logBuffer.pos+4:logBuffer.pos+4+size], logEntryData)
logBuffer.pos += size + 4
logBuffer.offset++
return nil
}
func (logBuffer *LogBuffer) IsStopping() bool {
return logBuffer.isStopping.Load()
}
// ForceFlush immediately flushes the current buffer content and WAITS for completion
// This is useful for critical topics that need immediate persistence
// CRITICAL: This function is now SYNCHRONOUS - it blocks until the flush completes
func (logBuffer *LogBuffer) ForceFlush() {
if logBuffer.isStopping.Load() {
return // Don't flush if we're shutting down
}
logBuffer.Lock()
toFlush := logBuffer.copyToFlushWithCallback()
logBuffer.Unlock()
if toFlush != nil {
// Send to flush channel (with reasonable timeout)
select {
case logBuffer.flushChan <- toFlush:
// Successfully queued for flush - now WAIT for it to complete
select {
case <-toFlush.done:
// Flush completed successfully
case <-time.After(5 * time.Second):
// Timeout waiting for flush - this shouldn't happen
}
case <-time.After(2 * time.Second):
// If flush channel is still blocked after 2s, something is wrong
}
}
}
// ShutdownLogBuffer flushes the buffer and stops the log buffer
func (logBuffer *LogBuffer) ShutdownLogBuffer() {
isAlreadyStopped := logBuffer.isStopping.Swap(true)
if isAlreadyStopped {
return
}
toFlush := logBuffer.copyToFlush()
logBuffer.flushChan <- toFlush
close(logBuffer.flushChan)
}
// IsAllFlushed returns true if all data in the buffer has been flushed, after calling ShutdownLogBuffer().
func (logBuffer *LogBuffer) IsAllFlushed() bool {
return logBuffer.isAllFlushed
}
func (logBuffer *LogBuffer) loopFlush() {
for d := range logBuffer.flushChan {
if d != nil {
logBuffer.flushFn(logBuffer, d.startTime, d.stopTime, d.data.Bytes(), d.minOffset, d.maxOffset)
d.releaseMemory()
// local logbuffer is different from aggregate logbuffer here
logBuffer.lastFlushDataTime = d.stopTime
// CRITICAL: Track what's been flushed to disk for both offset-based and time-based reads
// Use >= 0 to include offset 0 (first message in a topic)
if d.maxOffset >= 0 {
logBuffer.lastFlushedOffset.Store(d.maxOffset)
}
if !d.stopTime.IsZero() {
logBuffer.lastFlushTsNs.Store(d.stopTime.UnixNano())
}
// Signal completion if there's a callback channel
if d.done != nil {
close(d.done)
}
}
}
logBuffer.isAllFlushed = true
}
func (logBuffer *LogBuffer) loopInterval() {
for !logBuffer.IsStopping() {
time.Sleep(logBuffer.flushInterval)
if logBuffer.IsStopping() {
return
}
logBuffer.Lock()
toFlush := logBuffer.copyToFlush()
logBuffer.Unlock()
if toFlush != nil {
logBuffer.flushChan <- toFlush
}
}
}
func (logBuffer *LogBuffer) copyToFlush() *dataToFlush {
return logBuffer.copyToFlushInternal(false)
}
func (logBuffer *LogBuffer) copyToFlushWithCallback() *dataToFlush {
return logBuffer.copyToFlushInternal(true)
}
func (logBuffer *LogBuffer) copyToFlushInternal(withCallback bool) *dataToFlush {
if logBuffer.pos > 0 {
var d *dataToFlush
if logBuffer.flushFn != nil {
d = &dataToFlush{
startTime: logBuffer.startTime,
stopTime: logBuffer.stopTime,
data: copiedBytes(logBuffer.buf[:logBuffer.pos]),
minOffset: logBuffer.minOffset,
maxOffset: logBuffer.maxOffset,
}
// Add callback channel for synchronous ForceFlush
if withCallback {
d.done = make(chan struct{})
}
} else {
logBuffer.lastFlushDataTime = logBuffer.stopTime
}
// CRITICAL: logBuffer.offset is the "next offset to assign", so last offset in buffer is offset-1
lastOffsetInBuffer := logBuffer.offset - 1
logBuffer.buf = logBuffer.prevBuffers.SealBuffer(logBuffer.startTime, logBuffer.stopTime, logBuffer.buf, logBuffer.pos, logBuffer.bufferStartOffset, lastOffsetInBuffer)
// Use zero time (time.Time{}) not epoch time (time.Unix(0,0))
// Epoch time (1970) breaks time-based reads after flush
logBuffer.startTime = time.Time{}
logBuffer.stopTime = time.Time{}
logBuffer.pos = 0
logBuffer.idx = logBuffer.idx[:0]
// DON'T increment offset - it's already pointing to the next offset!
// logBuffer.offset++ // REMOVED - this was causing offset gaps!
logBuffer.bufferStartOffset = logBuffer.offset // Next buffer starts at current offset (which is already the next one)
// Reset offset tracking
logBuffer.hasOffsets = false
logBuffer.minOffset = 0
logBuffer.maxOffset = 0
// Invalidate disk cache chunks after flush
// The cache may contain stale data from before this flush
// Invalidating ensures consumers will re-read fresh data from disk after flush
logBuffer.invalidateAllDiskCacheChunks()
return d
}
return nil
}
// invalidateAllDiskCacheChunks clears all cached disk chunks
// This should be called after a buffer flush to ensure consumers read fresh data from disk
func (logBuffer *LogBuffer) invalidateAllDiskCacheChunks() {
logBuffer.diskChunkCache.mu.Lock()
defer logBuffer.diskChunkCache.mu.Unlock()
if len(logBuffer.diskChunkCache.chunks) > 0 {
logBuffer.diskChunkCache.chunks = make(map[int64]*CachedDiskChunk)
}
}
func (logBuffer *LogBuffer) GetEarliestTime() time.Time {
return logBuffer.startTime
}
func (logBuffer *LogBuffer) GetEarliestPosition() MessagePosition {
return MessagePosition{
Time: logBuffer.startTime,
Offset: logBuffer.offset,
}
}
// GetLastFlushTsNs returns the latest flushed timestamp in Unix nanoseconds.
// Returns 0 if nothing has been flushed yet.
func (logBuffer *LogBuffer) GetLastFlushTsNs() int64 {
return logBuffer.lastFlushTsNs.Load()
}
func (d *dataToFlush) releaseMemory() {
d.data.Reset()
bufferPool.Put(d.data)
}
func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bufferCopy *bytes.Buffer, batchIndex int64, err error) {
logBuffer.RLock()
defer logBuffer.RUnlock()
isOffsetBased := lastReadPosition.IsOffsetBased
// For offset-based subscriptions, use offset comparisons, not time comparisons!
if isOffsetBased {
requestedOffset := lastReadPosition.Offset
// Check if the requested offset is in the current buffer range
if requestedOffset >= logBuffer.bufferStartOffset && requestedOffset <= logBuffer.offset {
// If current buffer is empty (pos=0), check if data is on disk or not yet written
if logBuffer.pos == 0 {
// If buffer is empty but offset range covers the request,
// it means data was in memory and has been flushed/moved out.
// The bufferStartOffset advancing to cover this offset proves data existed.
//
// Three cases:
// 1. requestedOffset < logBuffer.offset: Data was here, now flushed
// 2. requestedOffset == logBuffer.offset && bufferStartOffset > 0: Buffer advanced, data flushed
// 3. requestedOffset == logBuffer.offset && bufferStartOffset == 0: Initial state - try disk first!
//
// Cases 1 & 2: try disk read
// Case 3: try disk read (historical data might exist)
if requestedOffset < logBuffer.offset {
// Data was in the buffer range but buffer is now empty = flushed to disk
return nil, -2, ResumeFromDiskError
}
// requestedOffset == logBuffer.offset: Current position
// CRITICAL: For subscribers starting from offset 0, try disk read first
// (historical data might exist from previous runs)
if requestedOffset == 0 && logBuffer.bufferStartOffset == 0 && logBuffer.offset == 0 {
// Initial state: try disk read before waiting for new data
return nil, -2, ResumeFromDiskError
}
// Otherwise, wait for new data to arrive
return nil, logBuffer.offset, nil
}
return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.offset, nil
}
// Check previous buffers for the requested offset
for _, buf := range logBuffer.prevBuffers.buffers {
if requestedOffset >= buf.startOffset && requestedOffset <= buf.offset {
// If prevBuffer is empty, it means the data was flushed to disk
// (prevBuffers are created when buffer is flushed)
if buf.size == 0 {
// Empty prevBuffer covering this offset means data was flushed
return nil, -2, ResumeFromDiskError
}
return copiedBytes(buf.buf[:buf.size]), buf.offset, nil
}
}
// Offset not found in any buffer
if requestedOffset < logBuffer.bufferStartOffset {
// Data not in current buffers - must be on disk (flushed or never existed)
// Return ResumeFromDiskError to trigger disk read
return nil, -2, ResumeFromDiskError
}
if requestedOffset > logBuffer.offset {
// Future data, not available yet
return nil, logBuffer.offset, nil
}
// Offset not found - return nil
return nil, logBuffer.offset, nil
}
// TIMESTAMP-BASED READ (original logic)
// Read from disk and memory
// 1. read from disk, last time is = td
// 2. in memory, the earliest time = tm
// if tm <= td, case 2.1
// read from memory
// if tm is empty, case 2.2
// read from memory
// if td < tm, case 2.3
// read from disk again
var tsMemory time.Time
if !logBuffer.startTime.IsZero() {
tsMemory = logBuffer.startTime
}
for _, prevBuf := range logBuffer.prevBuffers.buffers {
if !prevBuf.startTime.IsZero() {
// If tsMemory is zero, assign directly; otherwise compare
if tsMemory.IsZero() || prevBuf.startTime.Before(tsMemory) {
tsMemory = prevBuf.startTime
}
}
}
if tsMemory.IsZero() { // case 2.2
return nil, -2, nil
} else if lastReadPosition.Time.Before(tsMemory) { // case 2.3
// For time-based reads, only check timestamp for disk reads
// Don't use offset comparisons as they're not meaningful for time-based subscriptions
// Special case: If requested time is zero (Unix epoch), treat as "start from beginning"
// This handles queries that want to read all data without knowing the exact start time
if lastReadPosition.Time.IsZero() || lastReadPosition.Time.Unix() == 0 {
// Start from the beginning of memory
// Fall through to case 2.1 to read from earliest buffer
} else if lastReadPosition.Offset <= 0 && lastReadPosition.Time.Before(tsMemory) {
// Treat first read with sentinel/zero offset as inclusive of earliest in-memory data
} else {
// Data not in memory buffers - read from disk
return nil, -2, ResumeFromDiskError
}
}
// the following is case 2.1
if lastReadPosition.Time.Equal(logBuffer.stopTime) && !logBuffer.stopTime.IsZero() {
// For first-read sentinel/zero offset, allow inclusive read at the boundary
if lastReadPosition.Offset > 0 {
return nil, logBuffer.offset, nil
}
}
if lastReadPosition.Time.After(logBuffer.stopTime) && !logBuffer.stopTime.IsZero() {
return nil, logBuffer.offset, nil
}
// Also check prevBuffers when current buffer is empty (startTime is zero)
if lastReadPosition.Time.Before(logBuffer.startTime) || logBuffer.startTime.IsZero() {
for _, buf := range logBuffer.prevBuffers.buffers {
if buf.startTime.After(lastReadPosition.Time) {
return copiedBytes(buf.buf[:buf.size]), buf.offset, nil
}
if !buf.startTime.After(lastReadPosition.Time) && buf.stopTime.After(lastReadPosition.Time) {
searchTime := lastReadPosition.Time
if lastReadPosition.Offset <= 0 {
searchTime = searchTime.Add(-time.Nanosecond)
}
pos, err := buf.locateByTs(searchTime)
if err != nil {
// Buffer corruption detected - return error wrapped with ErrBufferCorrupted
glog.Errorf("ReadFromBuffer: buffer corruption in prevBuffer: %v", err)
return nil, -1, fmt.Errorf("%w: %v", ErrBufferCorrupted, err)
}
return copiedBytes(buf.buf[pos:buf.size]), buf.offset, nil
}
}
// If current buffer is not empty, return it
if logBuffer.pos > 0 {
return copiedBytes(logBuffer.buf[:logBuffer.pos]), logBuffer.offset, nil
}
// Buffer is empty and no data in prevBuffers - wait for new data
return nil, logBuffer.offset, nil
}
lastTs := lastReadPosition.Time.UnixNano()
// Inclusive boundary for first-read sentinel/zero offset
searchTs := lastTs
if lastReadPosition.Offset <= 0 {
if searchTs > math.MinInt64+1 { // prevent underflow
searchTs = searchTs - 1
}
}
l, h := 0, len(logBuffer.idx)-1
/*
for i, pos := range m.idx {
logEntry, ts := readTs(m.buf, pos)
event := &filer_pb.SubscribeMetadataResponse{}
proto.Unmarshal(logEntry.Data, event)
entry := event.EventNotification.OldEntry
if entry == nil {
entry = event.EventNotification.NewEntry
}
}
*/
for l <= h {
mid := (l + h) / 2
pos := logBuffer.idx[mid]
_, t, err := readTs(logBuffer.buf, pos)
if err != nil {
// Buffer corruption detected in binary search
glog.Errorf("ReadFromBuffer: buffer corruption at idx[%d] pos %d: %v", mid, pos, err)
return nil, -1, fmt.Errorf("%w: %v", ErrBufferCorrupted, err)
}
if t <= searchTs {
l = mid + 1
} else if searchTs < t {
var prevT int64
if mid > 0 {
_, prevT, err = readTs(logBuffer.buf, logBuffer.idx[mid-1])
if err != nil {
// Buffer corruption detected in binary search (previous entry)
glog.Errorf("ReadFromBuffer: buffer corruption at idx[%d] pos %d: %v", mid-1, logBuffer.idx[mid-1], err)
return nil, -1, fmt.Errorf("%w: %v", ErrBufferCorrupted, err)
}
}
if prevT <= searchTs {
return copiedBytes(logBuffer.buf[pos:logBuffer.pos]), logBuffer.offset, nil
}
h = mid
}
}
// Binary search didn't find the timestamp - data may have been flushed to disk already
// Returning -2 signals to caller that data is not available in memory
return nil, -2, nil
}
func (logBuffer *LogBuffer) ReleaseMemory(b *bytes.Buffer) {
bufferPool.Put(b)
}
// GetName returns the log buffer name for metadata tracking
func (logBuffer *LogBuffer) GetName() string {
logBuffer.RLock()
defer logBuffer.RUnlock()
return logBuffer.name
}
// GetOffset returns the current offset for metadata tracking
func (logBuffer *LogBuffer) GetOffset() int64 {
logBuffer.RLock()
defer logBuffer.RUnlock()
return logBuffer.offset
}
var bufferPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
func copiedBytes(buf []byte) (copied *bytes.Buffer) {
copied = bufferPool.Get().(*bytes.Buffer)
copied.Reset()
copied.Write(buf)
return
}
func readTs(buf []byte, pos int) (size int, ts int64, err error) {
// Bounds check for size field (overflow-safe)
if pos < 0 || pos > len(buf)-4 {
return 0, 0, fmt.Errorf("corrupted log buffer: cannot read size at pos %d, buffer length %d", pos, len(buf))
}
size = int(util.BytesToUint32(buf[pos : pos+4]))
// Bounds check for entry data (overflow-safe, protects against negative size)
if size < 0 || size > len(buf)-pos-4 {
return 0, 0, fmt.Errorf("corrupted log buffer: entry size %d at pos %d exceeds buffer length %d", size, pos, len(buf))
}
entryData := buf[pos+4 : pos+4+size]
logEntry := &filer_pb.LogEntry{}
err = proto.Unmarshal(entryData, logEntry)
if err != nil {
// Return error instead of failing fast
// This allows caller to handle corruption gracefully
return 0, 0, fmt.Errorf("corrupted log buffer: failed to unmarshal LogEntry at pos %d, size %d: %w", pos, size, err)
}
return size, logEntry.TsNs, nil
}