Files
seaweedFS/weed/query/engine/hybrid_message_scanner.go
Chris Lu 3f946fc0c0 mount: make metadata cache rebuilds snapshot-consistent (#8531)
* filer: expose metadata events and list snapshots

* mount: invalidate hot directory caches

* mount: read hot directories directly from filer

* mount: add sequenced metadata cache applier

* mount: apply metadata responses through cache applier

* mount: replay snapshot-consistent directory builds

* mount: dedupe self metadata events

* mount: factor directory build cleanup

* mount: replace proto marshal dedup with composite key and ring buffer

The dedup logic was doing a full deterministic proto.Marshal on every
metadata event just to produce a dedup key. Replace with a cheap
composite string key (TsNs|Directory|OldName|NewName).

Also replace the sliding-window slice (which leaked the backing array
unboundedly) with a fixed-size ring buffer that reuses the same array.

* filer: remove mutex and proto.Clone from request-scoped MetadataEventSink

MetadataEventSink is created per-request and only accessed by the
goroutine handling the gRPC call. The mutex and double proto.Clone
(once in Record, once in Last) were unnecessary overhead on every
filer write operation. Store the pointer directly instead.

* mount: skip proto.Clone for caller-owned metadata events

Add ApplyMetadataResponseOwned that takes ownership of the response
without cloning. Local metadata events (mkdir, create, flush, etc.)
are freshly constructed and never shared, so the clone is unnecessary.

* filer: only populate MetadataEvent on successful DeleteEntry

Avoid calling eventSink.Last() on error paths where the sink may
contain a partial event from an intermediate child deletion during
recursive deletes.

* mount: avoid map allocation in collectDirectoryNotifications

Replace the map with a fixed-size array and linear dedup. There are
at most 3 directories to notify (old parent, new parent, new child
if directory), so a 3-element array avoids the heap allocation on
every metadata event.

* mount: fix potential deadlock in enqueueApplyRequest

Release applyStateMu before the blocking channel send. Previously,
if the channel was full (cap 128), the send would block while holding
the mutex, preventing Shutdown from acquiring it to set applyClosed.

* mount: restore signature-based self-event filtering as fast path

Re-add the signature check that was removed when content-based dedup
was introduced. Checking signatures is O(1) on a small slice and
avoids enqueuing and processing events that originated from this
mount instance. The content-based dedup remains as a fallback.

* filer: send snapshotTsNs only in first ListEntries response

The snapshot timestamp is identical for every entry in a single
ListEntries stream. Sending it in every response message wastes
wire bandwidth for large directories. The client already reads
it only from the first response.

* mount: exit read-through mode after successful full directory listing

MarkDirectoryRefreshed was defined but never called, so directories
that entered read-through mode (hot invalidation threshold) stayed
there permanently, hitting the filer on every readdir even when cold.
Call it after a complete read-through listing finishes.

* mount: include event shape and full paths in dedup key

The previous dedup key only used Names, which could collapse distinct
rename targets. Include the event shape (C/D/U/R), source directory,
new parent path, and both entry names so structurally different events
are never treated as duplicates.

* mount: drain pending requests on shutdown in runApplyLoop

After receiving the shutdown sentinel, drain any remaining requests
from applyCh non-blockingly and signal each with errMetaCacheClosed
so callers waiting on req.done are released.

* mount: include IsDirectory in synthetic delete events

metadataDeleteEvent now accepts an isDirectory parameter so the
applier can distinguish directory deletes from file deletes. Rmdir
passes true, Unlink passes false.

* mount: fall back to synthetic event when MetadataEvent is nil

In mknod and mkdir, if the filer response omits MetadataEvent (e.g.
older filer without the field), synthesize an equivalent local
metadata event so the cache is always updated.

* mount: make Flush metadata apply best-effort after successful commit

After filer_pb.CreateEntryWithResponse succeeds, the entry is
persisted. Don't fail the Flush syscall if the local metadata cache
apply fails — log and invalidate the directory cache instead.
Also fall back to a synthetic event when MetadataEvent is nil.

* mount: make Rename metadata apply best-effort

The rename has already succeeded on the filer by the time we apply
the local metadata event. Log failures instead of returning errors
that would be dropped by the caller anyway.

* mount: make saveEntry metadata apply best-effort with fallback

After UpdateEntryWithResponse succeeds, treat local metadata apply
as non-fatal. Log and invalidate the directory cache on failure.
Also fall back to a synthetic event when MetadataEvent is nil.

* filer_pb: preserve snapshotTsNs on error in ReadDirAllEntriesWithSnapshot

Return the snapshot timestamp even when the first page fails, so
callers receive the snapshot boundary when partial data was received.

* filer: send snapshot token for empty directory listings

When no entries are streamed, send a final ListEntriesResponse with
only SnapshotTsNs so clients always receive the snapshot boundary.

* mount: distinguish not-found vs transient errors in lookupEntry

Return fuse.EIO for non-not-found filer errors instead of
unconditionally returning ENOENT, so transient failures don't
masquerade as missing entries.

* mount: make CacheRemoteObject metadata apply best-effort

The file content has already been cached successfully. Don't fail
the read if the local metadata cache update fails.

* mount: use consistent snapshot for readdir in direct mode

Capture the SnapshotTsNs from the first loadDirectoryEntriesDirect
call and store it on the DirectoryHandle. Subsequent batch loads
pass this stored timestamp so all batches use the same snapshot.

Also export DoSeaweedListWithSnapshot so mount can use it directly
with snapshot passthrough.

* filer_pb: fix test fake to send SnapshotTsNs only on first response

Match the server behavior: only the first ListEntriesResponse in a
page carries the snapshot timestamp, subsequent entries leave it zero.

* Fix nil pointer dereference in ListEntries stream consumers

Remove the empty-directory snapshot-only response from ListEntries
that sent a ListEntriesResponse with Entry==nil, which crashed every
raw stream consumer that assumed resp.Entry is always non-nil.

Also add defensive nil checks for resp.Entry in all raw ListEntries
stream consumers across: S3 listing, broker topic lookup, broker
topic config, admin dashboard, topic retention, hybrid message
scanner, Kafka integration, and consumer offset storage.

* Add nil guards for resp.Entry in remaining ListEntries stream consumers

Covers: S3 object lock check, MQ management dashboard (version/
partition/offset loops), and topic retention version loop.

* Make applyLocalMetadataEvent best-effort in Link and Symlink

The filer operations already succeeded; failing the syscall because
the local cache apply failed is wrong. Log a warning and invalidate
the parent directory cache instead.

* Make applyLocalMetadataEvent best-effort in Mkdir/Rmdir/Mknod/Unlink

The filer RPC already committed; don't fail the syscall when the
local metadata cache apply fails. Log a warning and invalidate the
parent directory cache to force a re-fetch on next access.

* flushFileMetadata: add nil-fallback for metadata event and best-effort apply

Synthesize a metadata event when resp.GetMetadataEvent() is nil
(matching doFlush), and make the apply best-effort with cache
invalidation on failure.

* Prevent double-invocation of cleanupBuild in doEnsureVisited

Add a cleanupDone guard so the deferred cleanup and inline error-path
cleanup don't both call DeleteFolderChildren/AbortDirectoryBuild.

* Fix comment: signature check is O(n) not O(1)

* Prevent deferred cleanup after successful CompleteDirectoryBuild

Set cleanupDone before returning from the success path so the
deferred context-cancellation check cannot undo a published build.

* Invalidate parent directory caches on rename metadata apply failure

When applyLocalMetadataEvent fails during rename, invalidate the
source and destination parent directory caches so subsequent accesses
trigger a re-fetch from the filer.

* Add event nil-fallback and cache invalidation to Link and Symlink

Synthesize metadata events when the server doesn't return one, and
invalidate parent directory caches on apply failure.

* Match requested partition when scanning partition directories

Parse the partition range format (NNNN-NNNN) and match against the
requested partition parameter instead of using the first directory.

* Preserve snapshot timestamp across empty directory listings

Initialize actualSnapshotTsNs from the caller-requested value so it
isn't lost when the server returns no entries. Re-add the server-side
snapshot-only response for empty directories (all raw stream consumers
now have nil guards for Entry).

* Fix CreateEntry error wrapping to support errors.Is/errors.As

Use errors.New + %w instead of %v for resp.Error so callers can
unwrap the underlying error.

* Fix object lock pagination: only advance on non-nil entries

Move entriesReceived inside the nil check so nil entries don't
cause repeated ListEntries calls with the same lastFileName.

* Guard Attributes nil check before accessing Mtime in MQ management

* Do not send nil-Entry response for empty directory listings

The snapshot-only ListEntriesResponse (with Entry == nil) for empty
directories breaks consumers that treat any received response as an
entry (Java FilerClient, S3 listing). The Go client-side
DoSeaweedListWithSnapshot already preserves the caller-requested
snapshot via actualSnapshotTsNs initialization, so the server-side
send is unnecessary.

* Fix review findings: subscriber dedup, invalidation normalization, nil guards, shutdown race

- Remove self-signature early-return in processEventFn so all events
  flow through the applier (directory-build buffering sees self-originated
  events that arrive after a snapshot)
- Normalize NewParentPath in collectEntryInvalidations to avoid duplicate
  invalidations when NewParentPath is empty (same-directory update)
- Guard resp.Entry.Attributes for nil in admin_server.go and
  topic_retention.go to prevent panics on entries without attributes
- Fix enqueueApplyRequest race with shutdown by using select on both
  applyCh and applyDone, preventing sends after the apply loop exits
- Add cleanupDone check to deferred cleanup in meta_cache_init.go for
  clarity alongside the existing guard in cleanupBuild
- Add empty directory test case for snapshot consistency

* Propagate authoritative metadata event from CacheRemoteObjectToLocalCluster and generate client-side snapshot for empty directories

- Add metadata_event field to CacheRemoteObjectToLocalClusterResponse
  proto so the filer-emitted event is available to callers
- Use WithMetadataEventSink in the server handler to capture the event
  from NotifyUpdateEvent and return it on the response
- Update filehandle_read.go to prefer the RPC's metadata event over
  a locally fabricated one, falling back to metadataUpdateEvent when
  the server doesn't provide one (e.g., older filers)
- Generate a client-side snapshot cutoff in DoSeaweedListWithSnapshot
  when the server sends no snapshot (empty directory), so callers like
  CompleteDirectoryBuild get a meaningful boundary for filtering
  buffered events

* Skip directory notifications for dirs being built to prevent mid-build cache wipe

When a metadata event is buffered during a directory build,
applyMetadataSideEffects was still firing noteDirectoryUpdate for the
building directory. If the directory accumulated enough updates to
become "hot", markDirectoryReadThrough would call DeleteFolderChildren,
wiping entries that EnsureVisited had already inserted. The build would
then complete and mark the directory cached with incomplete data.

Fix by using applyMetadataSideEffectsSkippingBuildingDirs for buffered
events, which suppresses directory notifications for dirs currently in
buildingDirs while still applying entry invalidations.

* Add test for directory notification suppression during active build

TestDirectoryNotificationsSuppressedDuringBuild verifies that metadata
events targeting a directory under active EnsureVisited build do NOT
fire onDirectoryUpdate for that directory. In production, this prevents
markDirectoryReadThrough from calling DeleteFolderChildren mid-build,
which would wipe entries already inserted by the listing.

The test inserts an entry during a build, sends multiple metadata events
for the building directory, asserts no notifications fired for it,
verifies the entry survives, and confirms buffered events are replayed
after CompleteDirectoryBuild.

* Fix create invalidations, build guard, event shape, context, and snapshot error path

- collectEntryInvalidations: invalidate FUSE kernel cache on pure
  create events (OldEntry==nil && NewEntry!=nil), not just updates
  and deletes
- completeDirectoryBuildNow: only call markCachedFn when an active
  build existed (state != nil), preventing an unpopulated directory
  from being marked as cached
- Add metadataCreateEvent helper that produces a create-shaped event
  (NewEntry only, no OldEntry) and use it in mkdir, mknod, symlink,
  and hardlink create fallback paths instead of metadataUpdateEvent
  which incorrectly set both OldEntry and NewEntry
- applyMetadataResponseEnqueue: use context.Background() for the
  queued mutation so a cancelled caller context cannot abort the
  apply loop mid-write
- DoSeaweedListWithSnapshot: move snapshot initialization before
  ListEntries call so the error path returns the preserved snapshot
  instead of 0

* Fix review findings: test loop, cache race, context safety, snapshot consistency

- Fix build test loop starting at i=1 instead of i=0, missing new-0.txt verification
- Re-check IsDirectoryCached after cache miss to avoid ENOENT race with markDirectoryReadThrough
- Use context.Background() in enqueueAndWait so caller cancellation can't abort build/complete mid-way
- Pass dh.snapshotTsNs in skip-batch loadDirectoryEntriesDirect for snapshot consistency
- Prefer resp.MetadataEvent over fallback in Unlink event derivation
- Add comment on MetadataEventSink.Record single-event assumption

* Fix empty-directory snapshot clock skew and build cancellation race

Empty-directory snapshot: Remove client-side time.Now() synthesis when
the server returns no entries. Instead return snapshotTsNs=0, and in
completeDirectoryBuildNow replay ALL buffered events when snapshot is 0.
This eliminates the clock-skew bug where a client ahead of the filer
would filter out legitimate post-list events.

Build cancellation: Use context.Background() for BeginDirectoryBuild
and CompleteDirectoryBuild calls in doEnsureVisited, so errgroup
cancellation doesn't cause enqueueAndWait to return early and trigger
cleanupBuild while the operation is still queued.

* Add tests for empty-directory build replay and cancellation resilience

TestEmptyDirectoryBuildReplaysAllBufferedEvents: verifies that when
CompleteDirectoryBuild receives snapshotTsNs=0 (empty directory, no
server snapshot), ALL buffered events are replayed regardless of their
TsNs values — no clock-skew-sensitive filtering occurs.

TestBuildCompletionSurvivesCallerCancellation: verifies that once
CompleteDirectoryBuild is enqueued, a cancelled caller context does not
prevent the build from completing. The apply loop runs with
context.Background(), so the directory becomes cached and buffered
events are replayed even when the caller gives up waiting.

* Fix directory subtree cleanup, Link rollback, test robustness

- applyMetadataResponseLocked: when a directory entry is deleted or
  moved, call DeleteFolderChildren on the old path so cached descendants
  don't leak as stale entries.

- Link: save original HardLinkId/Counter before mutation. If
  CreateEntryWithResponse fails after the source was already updated,
  rollback the source entry to its original state via UpdateEntry.

- TestBuildCompletionSurvivesCallerCancellation: replace fixed
  time.Sleep(50ms) with a deadline-based poll that checks
  IsDirectoryCached in a loop, failing only after 2s timeout.

- TestReadDirAllEntriesWithSnapshotEmptyDirectory: assert that
  ListEntries was actually invoked on the mock client so the test
  exercises the RPC path.

- newMetadataEvent: add early return when both oldEntry and newEntry are
  nil to avoid emitting events with empty Directory.

---------

Co-authored-by: Copilot <copilot@github.com>
2026-03-07 09:19:40 -08:00

1909 lines
61 KiB
Go

package engine
import (
"container/heap"
"context"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/parquet-go/parquet-go"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/mq"
"github.com/seaweedfs/seaweedfs/weed/mq/logstore"
"github.com/seaweedfs/seaweedfs/weed/mq/schema"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/query/sqltypes"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"google.golang.org/protobuf/proto"
)
// HybridMessageScanner scans from ALL data sources:
// Architecture:
// 1. Unflushed in-memory data from brokers (mq_pb.DataMessage format) - REAL-TIME
// 2. Recent/live messages in log files (filer_pb.LogEntry format) - FLUSHED
// 3. Older messages in Parquet files (schema_pb.RecordValue format) - ARCHIVED
// 4. Seamlessly merges data from all sources chronologically
// 5. Provides complete real-time view of all messages in a topic
type HybridMessageScanner struct {
filerClient filer_pb.FilerClient
brokerClient BrokerClientInterface // For querying unflushed data
topic topic.Topic
recordSchema *schema_pb.RecordType
schemaFormat string // Serialization format: "AVRO", "PROTOBUF", "JSON_SCHEMA", or empty for schemaless
parquetLevels *schema.ParquetLevels
engine *SQLEngine // Reference for system column formatting
}
// NewHybridMessageScanner creates a scanner that reads from all data sources
// This provides complete real-time message coverage including unflushed data
func NewHybridMessageScanner(filerClient filer_pb.FilerClient, brokerClient BrokerClientInterface, namespace, topicName string, engine *SQLEngine) (*HybridMessageScanner, error) {
// Check if filerClient is available
if filerClient == nil {
return nil, fmt.Errorf("filerClient is required but not available")
}
// Create topic reference
t := topic.Topic{
Namespace: namespace,
Name: topicName,
}
// Get flat schema from broker client
recordType, _, schemaFormat, err := brokerClient.GetTopicSchema(context.Background(), namespace, topicName)
if err != nil {
return nil, fmt.Errorf("failed to get topic record type: %v", err)
}
if recordType == nil || len(recordType.Fields) == 0 {
// For topics without schema, create a minimal schema with system fields and _value
recordType = schema.RecordTypeBegin().
WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64).
WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes).
WithField(SW_COLUMN_NAME_VALUE, schema.TypeBytes). // Raw message value
RecordTypeEnd()
} else {
// Create a copy of the recordType to avoid modifying the original
recordTypeCopy := &schema_pb.RecordType{
Fields: make([]*schema_pb.Field, len(recordType.Fields)),
}
copy(recordTypeCopy.Fields, recordType.Fields)
// Add system columns that MQ adds to all records
recordType = schema.NewRecordTypeBuilder(recordTypeCopy).
WithField(SW_COLUMN_NAME_TIMESTAMP, schema.TypeInt64).
WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes).
RecordTypeEnd()
}
// Convert to Parquet levels for efficient reading
parquetLevels, err := schema.ToParquetLevels(recordType)
if err != nil {
return nil, fmt.Errorf("failed to create Parquet levels: %v", err)
}
return &HybridMessageScanner{
filerClient: filerClient,
brokerClient: brokerClient,
topic: t,
recordSchema: recordType,
schemaFormat: schemaFormat,
parquetLevels: parquetLevels,
engine: engine,
}, nil
}
// HybridScanOptions configure how the scanner reads from both live and archived data
type HybridScanOptions struct {
// Time range filtering (Unix nanoseconds)
StartTimeNs int64
StopTimeNs int64
// Column projection - if empty, select all columns
Columns []string
// Row limit - 0 means no limit
Limit int
// Row offset - 0 means no offset
Offset int
// Predicate for WHERE clause filtering
Predicate func(*schema_pb.RecordValue) bool
}
// HybridScanResult represents a message from either live logs or Parquet files
type HybridScanResult struct {
Values map[string]*schema_pb.Value // Column name -> value
Timestamp int64 // Message timestamp (_ts_ns)
Key []byte // Message key (_key)
Source string // "live_log" or "parquet_archive" or "in_memory_broker"
}
// HybridScanStats contains statistics about data sources scanned
type HybridScanStats struct {
BrokerBufferQueried bool
BrokerBufferMessages int
BufferStartIndex int64
PartitionsScanned int
LiveLogFilesScanned int // Number of live log files processed
}
// ParquetColumnStats holds statistics for a single column from parquet metadata
type ParquetColumnStats struct {
ColumnName string
MinValue *schema_pb.Value
MaxValue *schema_pb.Value
NullCount int64
RowCount int64
}
// ParquetFileStats holds aggregated statistics for a parquet file
type ParquetFileStats struct {
FileName string
RowCount int64
ColumnStats map[string]*ParquetColumnStats
// Optional file-level timestamp range from filer extended attributes
MinTimestampNs int64
MaxTimestampNs int64
}
// getTimestampRangeFromStats returns (minTsNs, maxTsNs, ok) by inspecting common timestamp columns
func (h *HybridMessageScanner) getTimestampRangeFromStats(fileStats *ParquetFileStats) (int64, int64, bool) {
if fileStats == nil {
return 0, 0, false
}
// Prefer column stats for _ts_ns if present
if len(fileStats.ColumnStats) > 0 {
if s, ok := fileStats.ColumnStats[logstore.SW_COLUMN_NAME_TS]; ok && s != nil && s.MinValue != nil && s.MaxValue != nil {
if minNs, okMin := h.schemaValueToNs(s.MinValue); okMin {
if maxNs, okMax := h.schemaValueToNs(s.MaxValue); okMax {
return minNs, maxNs, true
}
}
}
}
// Fallback to file-level range if present in filer extended metadata
if fileStats.MinTimestampNs != 0 || fileStats.MaxTimestampNs != 0 {
return fileStats.MinTimestampNs, fileStats.MaxTimestampNs, true
}
return 0, 0, false
}
// schemaValueToNs converts a schema_pb.Value that represents a timestamp to ns
func (h *HybridMessageScanner) schemaValueToNs(v *schema_pb.Value) (int64, bool) {
if v == nil {
return 0, false
}
switch k := v.Kind.(type) {
case *schema_pb.Value_Int64Value:
return k.Int64Value, true
case *schema_pb.Value_Int32Value:
return int64(k.Int32Value), true
default:
return 0, false
}
}
// StreamingDataSource provides a streaming interface for reading scan results
type StreamingDataSource interface {
Next() (*HybridScanResult, error) // Returns next result or nil when done
HasMore() bool // Returns true if more data available
Close() error // Clean up resources
}
// StreamingMergeItem represents an item in the priority queue for streaming merge
type StreamingMergeItem struct {
Result *HybridScanResult
SourceID int
DataSource StreamingDataSource
}
// StreamingMergeHeap implements heap.Interface for merging sorted streams by timestamp
type StreamingMergeHeap []*StreamingMergeItem
func (h StreamingMergeHeap) Len() int { return len(h) }
func (h StreamingMergeHeap) Less(i, j int) bool {
// Sort by timestamp (ascending order)
return h[i].Result.Timestamp < h[j].Result.Timestamp
}
func (h StreamingMergeHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *StreamingMergeHeap) Push(x interface{}) {
*h = append(*h, x.(*StreamingMergeItem))
}
func (h *StreamingMergeHeap) Pop() interface{} {
old := *h
n := len(old)
item := old[n-1]
*h = old[0 : n-1]
return item
}
// Scan reads messages from both live logs and archived Parquet files
// Uses SeaweedFS MQ's GenMergedReadFunc for seamless integration
// Assumptions:
// 1. Chronologically merges live and archived data
// 2. Applies filtering at the lowest level for efficiency
// 3. Handles schema evolution transparently
func (hms *HybridMessageScanner) Scan(ctx context.Context, options HybridScanOptions) ([]HybridScanResult, error) {
results, _, err := hms.ScanWithStats(ctx, options)
return results, err
}
// ScanWithStats reads messages and returns scan statistics for execution plans
func (hms *HybridMessageScanner) ScanWithStats(ctx context.Context, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) {
var results []HybridScanResult
stats := &HybridScanStats{}
// Get all partitions for this topic via MQ broker discovery
partitions, err := hms.discoverTopicPartitions(ctx)
if err != nil {
return nil, stats, fmt.Errorf("failed to discover partitions for topic %s: %v", hms.topic.String(), err)
}
stats.PartitionsScanned = len(partitions)
for _, partition := range partitions {
partitionResults, partitionStats, err := hms.scanPartitionHybridWithStats(ctx, partition, options)
if err != nil {
return nil, stats, fmt.Errorf("failed to scan partition %v: %v", partition, err)
}
results = append(results, partitionResults...)
// Aggregate broker buffer stats
if partitionStats != nil {
if partitionStats.BrokerBufferQueried {
stats.BrokerBufferQueried = true
}
stats.BrokerBufferMessages += partitionStats.BrokerBufferMessages
if partitionStats.BufferStartIndex > 0 && (stats.BufferStartIndex == 0 || partitionStats.BufferStartIndex < stats.BufferStartIndex) {
stats.BufferStartIndex = partitionStats.BufferStartIndex
}
}
// Apply global limit (without offset) across all partitions
// When OFFSET is used, collect more data to ensure we have enough after skipping
// Note: OFFSET will be applied at the end to avoid double-application
if options.Limit > 0 {
// Collect exact amount needed: LIMIT + OFFSET (no excessive doubling)
minRequired := options.Limit + options.Offset
// Small buffer only when needed to handle edge cases in distributed scanning
if options.Offset > 0 && minRequired < 10 {
minRequired = minRequired + 1 // Add 1 extra row buffer, not doubling
}
if len(results) >= minRequired {
break
}
}
}
// Apply final OFFSET and LIMIT processing (done once at the end)
// Limit semantics: -1 = no limit, 0 = LIMIT 0 (empty), >0 = limit to N rows
if options.Offset > 0 || options.Limit >= 0 {
// Handle LIMIT 0 special case first
if options.Limit == 0 {
return []HybridScanResult{}, stats, nil
}
// Apply OFFSET first
if options.Offset > 0 {
if options.Offset >= len(results) {
results = []HybridScanResult{}
} else {
results = results[options.Offset:]
}
}
// Apply LIMIT after OFFSET (only if limit > 0)
if options.Limit > 0 && len(results) > options.Limit {
results = results[:options.Limit]
}
}
return results, stats, nil
}
// scanUnflushedData queries brokers for unflushed in-memory data using buffer_start deduplication
func (hms *HybridMessageScanner) scanUnflushedData(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, error) {
results, _, err := hms.scanUnflushedDataWithStats(ctx, partition, options)
return results, err
}
// scanUnflushedDataWithStats queries brokers for unflushed data and returns statistics
func (hms *HybridMessageScanner) scanUnflushedDataWithStats(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) {
var results []HybridScanResult
stats := &HybridScanStats{}
// Skip if no broker client available
if hms.brokerClient == nil {
return results, stats, nil
}
// Mark that we attempted to query broker buffer
stats.BrokerBufferQueried = true
// Step 1: Get unflushed data from broker using buffer_start-based method
// This method uses buffer_start metadata to avoid double-counting with exact precision
unflushedEntries, err := hms.brokerClient.GetUnflushedMessages(ctx, hms.topic.Namespace, hms.topic.Name, partition, options.StartTimeNs)
if err != nil {
// Log error but don't fail the query - continue with disk data only
// Reset queried flag on error
stats.BrokerBufferQueried = false
return results, stats, nil
}
// Capture stats for EXPLAIN
stats.BrokerBufferMessages = len(unflushedEntries)
// Step 2: Process unflushed entries (already deduplicated by broker)
for _, logEntry := range unflushedEntries {
// Pre-decode DataMessage for reuse in both control check and conversion
var dataMessage *mq_pb.DataMessage
if len(logEntry.Data) > 0 {
dataMessage = &mq_pb.DataMessage{}
if err := proto.Unmarshal(logEntry.Data, dataMessage); err != nil {
dataMessage = nil // Failed to decode, treat as raw data
}
}
// Skip control entries without actual data
if hms.isControlEntryWithDecoded(logEntry, dataMessage) {
continue // Skip this entry
}
// Skip messages outside time range
if options.StartTimeNs > 0 && logEntry.TsNs < options.StartTimeNs {
continue
}
if options.StopTimeNs > 0 && logEntry.TsNs > options.StopTimeNs {
continue
}
// Convert LogEntry to RecordValue format (same as disk data)
recordValue, _, err := hms.convertLogEntryToRecordValueWithDecoded(logEntry, dataMessage)
if err != nil {
continue // Skip malformed messages
}
// Apply predicate filter if provided
if options.Predicate != nil && !options.Predicate(recordValue) {
continue
}
// Extract system columns for result
timestamp := recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value()
key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue()
// Apply column projection
values := make(map[string]*schema_pb.Value)
if len(options.Columns) == 0 {
// Select all columns (excluding system columns from user view)
for name, value := range recordValue.Fields {
if name != SW_COLUMN_NAME_TIMESTAMP && name != SW_COLUMN_NAME_KEY {
values[name] = value
}
}
} else {
// Select specified columns only
for _, columnName := range options.Columns {
if value, exists := recordValue.Fields[columnName]; exists {
values[columnName] = value
}
}
}
// Create result with proper source tagging
result := HybridScanResult{
Values: values,
Timestamp: timestamp,
Key: key,
Source: "live_log", // Data from broker's unflushed messages
}
results = append(results, result)
// Apply limit (accounting for offset) - collect exact amount needed
if options.Limit > 0 {
// Collect exact amount needed: LIMIT + OFFSET (no excessive doubling)
minRequired := options.Limit + options.Offset
// Small buffer only when needed to handle edge cases in message streaming
if options.Offset > 0 && minRequired < 10 {
minRequired = minRequired + 1 // Add 1 extra row buffer, not doubling
}
if len(results) >= minRequired {
break
}
}
}
return results, stats, nil
}
// convertDataMessageToRecord converts mq_pb.DataMessage to schema_pb.RecordValue
func (hms *HybridMessageScanner) convertDataMessageToRecord(msg *mq_pb.DataMessage) (*schema_pb.RecordValue, string, error) {
// Parse the message data as RecordValue
recordValue := &schema_pb.RecordValue{}
if err := proto.Unmarshal(msg.Value, recordValue); err != nil {
return nil, "", fmt.Errorf("failed to unmarshal message data: %v", err)
}
// Add system columns
if recordValue.Fields == nil {
recordValue.Fields = make(map[string]*schema_pb.Value)
}
// Add timestamp
recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
Kind: &schema_pb.Value_Int64Value{Int64Value: msg.TsNs},
}
return recordValue, string(msg.Key), nil
}
// discoverTopicPartitions discovers the actual partitions for this topic by scanning the filesystem
// This finds real partition directories like v2025-09-01-07-16-34/0000-0630/
func (hms *HybridMessageScanner) discoverTopicPartitions(ctx context.Context) ([]topic.Partition, error) {
if hms.filerClient == nil {
return nil, fmt.Errorf("filerClient not available for partition discovery")
}
var allPartitions []topic.Partition
var err error
// Scan the topic directory for actual partition versions (timestamped directories)
// List all version directories in the topic directory
err = filer_pb.ReadDirAllEntries(ctx, hms.filerClient, util.FullPath(hms.topic.Dir()), "", func(versionEntry *filer_pb.Entry, isLast bool) error {
if !versionEntry.IsDirectory {
return nil // Skip non-directories
}
// Parse version timestamp from directory name (e.g., "v2025-09-01-07-16-34")
versionTime, parseErr := topic.ParseTopicVersion(versionEntry.Name)
if parseErr != nil {
// Skip directories that don't match the version format
return nil
}
// Scan partition directories within this version
versionDir := fmt.Sprintf("%s/%s", hms.topic.Dir(), versionEntry.Name)
return filer_pb.ReadDirAllEntries(ctx, hms.filerClient, util.FullPath(versionDir), "", func(partitionEntry *filer_pb.Entry, isLast bool) error {
if !partitionEntry.IsDirectory {
return nil // Skip non-directories
}
// Parse partition boundary from directory name (e.g., "0000-0630")
rangeStart, rangeStop := topic.ParsePartitionBoundary(partitionEntry.Name)
if rangeStart == rangeStop {
return nil // Skip invalid partition names
}
// Create partition object
partition := topic.Partition{
RangeStart: rangeStart,
RangeStop: rangeStop,
RingSize: topic.PartitionCount,
UnixTimeNs: versionTime.UnixNano(),
}
allPartitions = append(allPartitions, partition)
return nil
})
})
if err != nil {
return nil, fmt.Errorf("failed to scan topic directory for partitions: %v", err)
}
// If no partitions found, return empty slice (valid for newly created or empty topics)
if len(allPartitions) == 0 {
fmt.Printf("No partitions found for topic %s - returning empty result set\n", hms.topic.String())
return []topic.Partition{}, nil
}
fmt.Printf("Discovered %d partitions for topic %s\n", len(allPartitions), hms.topic.String())
return allPartitions, nil
}
// scanPartitionHybrid scans a specific partition using the hybrid approach
// This is where the magic happens - seamlessly reading ALL data sources:
// 1. Unflushed in-memory data from brokers (REAL-TIME)
// 2. Live logs + Parquet files from disk (FLUSHED/ARCHIVED)
func (hms *HybridMessageScanner) scanPartitionHybrid(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, error) {
results, _, err := hms.scanPartitionHybridWithStats(ctx, partition, options)
return results, err
}
// scanPartitionHybridWithStats scans a specific partition using streaming merge for memory efficiency
// PERFORMANCE IMPROVEMENT: Uses heap-based streaming merge instead of collecting all data and sorting
// - Memory usage: O(k) where k = number of data sources, instead of O(n) where n = total records
// - Scalable: Can handle large topics without LIMIT clauses efficiently
// - Streaming: Processes data as it arrives rather than buffering everything
func (hms *HybridMessageScanner) scanPartitionHybridWithStats(ctx context.Context, partition topic.Partition, options HybridScanOptions) ([]HybridScanResult, *HybridScanStats, error) {
stats := &HybridScanStats{}
// STEP 1: Scan unflushed in-memory data from brokers (REAL-TIME)
unflushedResults, unflushedStats, err := hms.scanUnflushedDataWithStats(ctx, partition, options)
if err != nil {
// Don't fail the query if broker scanning fails, but provide clear warning to user
// This ensures users are aware that results may not include the most recent data
fmt.Printf("Warning: Unable to access real-time data from message broker: %v\n", err)
fmt.Printf("Note: Query results may not include the most recent unflushed messages\n")
} else if unflushedStats != nil {
stats.BrokerBufferQueried = unflushedStats.BrokerBufferQueried
stats.BrokerBufferMessages = unflushedStats.BrokerBufferMessages
stats.BufferStartIndex = unflushedStats.BufferStartIndex
}
// Count live log files for statistics
liveLogCount, err := hms.countLiveLogFiles(partition)
if err != nil {
// Don't fail the query, just log warning
fmt.Printf("Warning: Failed to count live log files: %v\n", err)
liveLogCount = 0
}
stats.LiveLogFilesScanned = liveLogCount
// STEP 2: Create streaming data sources for memory-efficient merge
var dataSources []StreamingDataSource
// Add unflushed data source (if we have unflushed results)
if len(unflushedResults) > 0 {
// Sort unflushed results by timestamp before creating stream
if len(unflushedResults) > 1 {
hms.mergeSort(unflushedResults, 0, len(unflushedResults)-1)
}
dataSources = append(dataSources, NewSliceDataSource(unflushedResults))
}
// Add streaming flushed data source (live logs + Parquet files)
flushedDataSource := NewStreamingFlushedDataSource(hms, partition, options)
dataSources = append(dataSources, flushedDataSource)
// STEP 3: Use streaming merge for memory-efficient chronological ordering
var results []HybridScanResult
if len(dataSources) > 0 {
// Calculate how many rows we need to collect during scanning (before OFFSET/LIMIT)
// For LIMIT N OFFSET M, we need to collect at least N+M rows
scanLimit := options.Limit
if options.Limit > 0 && options.Offset > 0 {
scanLimit = options.Limit + options.Offset
}
mergedResults, err := hms.streamingMerge(dataSources, scanLimit)
if err != nil {
return nil, stats, fmt.Errorf("streaming merge failed: %v", err)
}
results = mergedResults
}
return results, stats, nil
}
// countLiveLogFiles counts the number of live log files in a partition for statistics
func (hms *HybridMessageScanner) countLiveLogFiles(partition topic.Partition) (int, error) {
partitionDir := topic.PartitionDir(hms.topic, partition)
var fileCount int
err := hms.filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
// List all files in partition directory
request := &filer_pb.ListEntriesRequest{
Directory: partitionDir,
Prefix: "",
StartFromFileName: "",
InclusiveStartFrom: true,
Limit: 10000, // reasonable limit for counting
}
stream, err := client.ListEntries(context.Background(), request)
if err != nil {
return err
}
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
if resp.Entry == nil {
continue
}
// Count files that are not .parquet files (live log files)
// Live log files typically have timestamps or are named like log files
fileName := resp.Entry.Name
if !strings.HasSuffix(fileName, ".parquet") &&
!strings.HasSuffix(fileName, ".offset") &&
len(resp.Entry.Chunks) > 0 { // Has actual content
fileCount++
}
}
return nil
})
if err != nil {
return 0, err
}
return fileCount, nil
}
// isControlEntry checks if a log entry is a control entry without actual data
// Based on MQ system analysis, control entries are:
// 1. DataMessages with populated Ctrl field (publisher close signals)
// 2. Entries with empty keys (as filtered by subscriber)
// NOTE: Messages with empty data but valid keys (like NOOP messages) are NOT control entries
func (hms *HybridMessageScanner) isControlEntry(logEntry *filer_pb.LogEntry) bool {
// Pre-decode DataMessage if needed
var dataMessage *mq_pb.DataMessage
if len(logEntry.Data) > 0 {
dataMessage = &mq_pb.DataMessage{}
if err := proto.Unmarshal(logEntry.Data, dataMessage); err != nil {
dataMessage = nil // Failed to decode, treat as raw data
}
}
return hms.isControlEntryWithDecoded(logEntry, dataMessage)
}
// isControlEntryWithDecoded checks if a log entry is a control entry using pre-decoded DataMessage
// This avoids duplicate protobuf unmarshaling when the DataMessage is already decoded
func (hms *HybridMessageScanner) isControlEntryWithDecoded(logEntry *filer_pb.LogEntry, dataMessage *mq_pb.DataMessage) bool {
// Skip entries with empty keys (same logic as subscriber)
if len(logEntry.Key) == 0 {
return true
}
// Check if this is a DataMessage with control field populated
if dataMessage != nil && dataMessage.Ctrl != nil {
return true
}
// Messages with valid keys (even if data is empty) are legitimate messages
// Examples: NOOP messages from Schema Registry
return false
}
// isNullOrEmpty checks if a schema_pb.Value is null or empty
func isNullOrEmpty(value *schema_pb.Value) bool {
if value == nil {
return true
}
switch v := value.Kind.(type) {
case *schema_pb.Value_StringValue:
return v.StringValue == ""
case *schema_pb.Value_BytesValue:
return len(v.BytesValue) == 0
case *schema_pb.Value_ListValue:
return v.ListValue == nil || len(v.ListValue.Values) == 0
case nil:
return true // No kind set means null
default:
return false
}
}
// isSchemaless checks if the scanner is configured for a schema-less topic
// Schema-less topics only have system fields: _ts_ns, _key, and _value
func (hms *HybridMessageScanner) isSchemaless() bool {
// Schema-less topics only have system fields: _ts_ns, _key, and _value
// System topics like _schemas are NOT schema-less - they have structured data
// We just need to map their fields during read
if hms.recordSchema == nil {
return false
}
// Count only non-system data fields (exclude _ts_ns and _key which are always present)
// Schema-less topics should only have _value as the data field
hasValue := false
dataFieldCount := 0
for _, field := range hms.recordSchema.Fields {
switch field.Name {
case SW_COLUMN_NAME_TIMESTAMP, SW_COLUMN_NAME_KEY:
// System fields - ignore
continue
case SW_COLUMN_NAME_VALUE:
hasValue = true
dataFieldCount++
default:
// Any other field means it's not schema-less
dataFieldCount++
}
}
// Schema-less = only has _value field as the data field (plus system fields)
return hasValue && dataFieldCount == 1
}
// convertLogEntryToRecordValue converts a filer_pb.LogEntry to schema_pb.RecordValue
// This handles both:
// 1. Live log entries (raw message format)
// 2. Parquet entries (already in schema_pb.RecordValue format)
// 3. Schema-less topics (raw bytes in _value field)
func (hms *HybridMessageScanner) convertLogEntryToRecordValue(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) {
// For schema-less topics, put raw data directly into _value field
if hms.isSchemaless() {
recordValue := &schema_pb.RecordValue{
Fields: make(map[string]*schema_pb.Value),
}
recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
}
recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
}
recordValue.Fields[SW_COLUMN_NAME_VALUE] = &schema_pb.Value{
Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Data},
}
return recordValue, "live_log", nil
}
// Try to unmarshal as RecordValue first (Parquet format)
recordValue := &schema_pb.RecordValue{}
if err := proto.Unmarshal(logEntry.Data, recordValue); err == nil {
// This is an archived message from Parquet files
// FIX: Add system columns from LogEntry to RecordValue
if recordValue.Fields == nil {
recordValue.Fields = make(map[string]*schema_pb.Value)
}
// Add system columns from LogEntry
recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
}
recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
}
return recordValue, "parquet_archive", nil
}
// If not a RecordValue, this is raw live message data - parse with schema
return hms.parseRawMessageWithSchema(logEntry)
}
// min returns the minimum of two integers
func min(a, b int) int {
if a < b {
return a
}
return b
}
// parseRawMessageWithSchema parses raw live message data using the topic's schema
// This provides proper type conversion and field mapping instead of treating everything as strings
func (hms *HybridMessageScanner) parseRawMessageWithSchema(logEntry *filer_pb.LogEntry) (*schema_pb.RecordValue, string, error) {
recordValue := &schema_pb.RecordValue{
Fields: make(map[string]*schema_pb.Value),
}
// Add system columns (always present)
recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
}
recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
}
// Parse message data based on schema
if hms.recordSchema == nil || len(hms.recordSchema.Fields) == 0 {
// Fallback: No schema available, use "_value" for schema-less topics only
if hms.isSchemaless() {
recordValue.Fields[SW_COLUMN_NAME_VALUE] = &schema_pb.Value{
Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Data},
}
}
return recordValue, "live_log", nil
}
// Use schema format to directly choose the right decoder
// This avoids trying multiple decoders and improves performance
var parsedRecord *schema_pb.RecordValue
var err error
switch hms.schemaFormat {
case "AVRO":
// AVRO format - use Avro decoder
// Note: Avro decoding requires schema registry integration
// For now, fall through to JSON as many Avro messages are also valid JSON
parsedRecord, err = hms.parseJSONMessage(logEntry.Data)
case "PROTOBUF":
// PROTOBUF format - use protobuf decoder
parsedRecord, err = hms.parseProtobufMessage(logEntry.Data)
case "JSON_SCHEMA", "":
// JSON_SCHEMA format or empty (default to JSON)
// JSON is the most common format for schema registry
parsedRecord, err = hms.parseJSONMessage(logEntry.Data)
if err != nil {
// Try protobuf as fallback
parsedRecord, err = hms.parseProtobufMessage(logEntry.Data)
}
default:
// Unknown format - try JSON first, then protobuf as fallback
parsedRecord, err = hms.parseJSONMessage(logEntry.Data)
if err != nil {
parsedRecord, err = hms.parseProtobufMessage(logEntry.Data)
}
}
if err == nil && parsedRecord != nil {
// Successfully parsed, merge with system columns
for fieldName, fieldValue := range parsedRecord.Fields {
recordValue.Fields[fieldName] = fieldValue
}
return recordValue, "live_log", nil
}
// Fallback: If schema has a single field, map the raw data to it with type conversion
if len(hms.recordSchema.Fields) == 1 {
field := hms.recordSchema.Fields[0]
convertedValue, convErr := hms.convertRawDataToSchemaValue(logEntry.Data, field.Type)
if convErr == nil {
recordValue.Fields[field.Name] = convertedValue
return recordValue, "live_log", nil
}
}
// Final fallback: treat as bytes field for schema-less topics only
if hms.isSchemaless() {
recordValue.Fields[SW_COLUMN_NAME_VALUE] = &schema_pb.Value{
Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Data},
}
}
return recordValue, "live_log", nil
}
// convertLogEntryToRecordValueWithDecoded converts a filer_pb.LogEntry to schema_pb.RecordValue
// using a pre-decoded DataMessage to avoid duplicate protobuf unmarshaling
func (hms *HybridMessageScanner) convertLogEntryToRecordValueWithDecoded(logEntry *filer_pb.LogEntry, dataMessage *mq_pb.DataMessage) (*schema_pb.RecordValue, string, error) {
// IMPORTANT: Check for schema-less topics FIRST
// Schema-less topics (like _schemas) should store raw data directly in _value field
if hms.isSchemaless() {
recordValue := &schema_pb.RecordValue{
Fields: make(map[string]*schema_pb.Value),
}
recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
}
recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
}
recordValue.Fields[SW_COLUMN_NAME_VALUE] = &schema_pb.Value{
Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Data},
}
return recordValue, "live_log", nil
}
// CRITICAL: The broker stores DataMessage.Value directly in LogEntry.Data
// So we need to try unmarshaling LogEntry.Data as RecordValue first
var recordValueBytes []byte
if dataMessage != nil && len(dataMessage.Value) > 0 {
// DataMessage has a Value field - use it
recordValueBytes = dataMessage.Value
} else {
// DataMessage doesn't have Value, use LogEntry.Data directly
// This is the normal case when broker stores messages
recordValueBytes = logEntry.Data
}
// Try to unmarshal as RecordValue
if len(recordValueBytes) > 0 {
recordValue := &schema_pb.RecordValue{}
if err := proto.Unmarshal(recordValueBytes, recordValue); err == nil {
// Successfully unmarshaled as RecordValue
// Ensure Fields map exists
if recordValue.Fields == nil {
recordValue.Fields = make(map[string]*schema_pb.Value)
}
// Add system columns from LogEntry
recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP] = &schema_pb.Value{
Kind: &schema_pb.Value_Int64Value{Int64Value: logEntry.TsNs},
}
recordValue.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
Kind: &schema_pb.Value_BytesValue{BytesValue: logEntry.Key},
}
return recordValue, "live_log", nil
}
// If unmarshaling as RecordValue fails, fall back to schema-aware parsing
}
// For cases where protobuf unmarshaling failed or data is empty,
// attempt schema-aware parsing to try JSON, protobuf, and other formats
return hms.parseRawMessageWithSchema(logEntry)
}
// parseJSONMessage attempts to parse raw data as JSON and map to schema fields
func (hms *HybridMessageScanner) parseJSONMessage(data []byte) (*schema_pb.RecordValue, error) {
// Try to parse as JSON
var jsonData map[string]interface{}
if err := json.Unmarshal(data, &jsonData); err != nil {
return nil, fmt.Errorf("not valid JSON: %v", err)
}
recordValue := &schema_pb.RecordValue{
Fields: make(map[string]*schema_pb.Value),
}
// Map JSON fields to schema fields
for _, schemaField := range hms.recordSchema.Fields {
fieldName := schemaField.Name
if jsonValue, exists := jsonData[fieldName]; exists {
schemaValue, err := hms.convertJSONValueToSchemaValue(jsonValue, schemaField.Type)
if err != nil {
// Log conversion error but continue with other fields
continue
}
recordValue.Fields[fieldName] = schemaValue
}
}
return recordValue, nil
}
// parseProtobufMessage attempts to parse raw data as protobuf RecordValue
func (hms *HybridMessageScanner) parseProtobufMessage(data []byte) (*schema_pb.RecordValue, error) {
// This might be a raw protobuf message that didn't parse correctly the first time
// Try alternative protobuf unmarshaling approaches
recordValue := &schema_pb.RecordValue{}
// Strategy 1: Direct unmarshaling (might work if it's actually a RecordValue)
if err := proto.Unmarshal(data, recordValue); err == nil {
return recordValue, nil
}
// Strategy 2: Check if it's a different protobuf message type
// For now, return error as we need more specific knowledge of MQ message formats
return nil, fmt.Errorf("could not parse as protobuf RecordValue")
}
// convertRawDataToSchemaValue converts raw bytes to a specific schema type
func (hms *HybridMessageScanner) convertRawDataToSchemaValue(data []byte, fieldType *schema_pb.Type) (*schema_pb.Value, error) {
dataStr := string(data)
switch fieldType.Kind.(type) {
case *schema_pb.Type_ScalarType:
scalarType := fieldType.GetScalarType()
switch scalarType {
case schema_pb.ScalarType_STRING:
return &schema_pb.Value{
Kind: &schema_pb.Value_StringValue{StringValue: dataStr},
}, nil
case schema_pb.ScalarType_INT32:
if val, err := strconv.ParseInt(strings.TrimSpace(dataStr), 10, 32); err == nil {
return &schema_pb.Value{
Kind: &schema_pb.Value_Int32Value{Int32Value: int32(val)},
}, nil
}
case schema_pb.ScalarType_INT64:
if val, err := strconv.ParseInt(strings.TrimSpace(dataStr), 10, 64); err == nil {
return &schema_pb.Value{
Kind: &schema_pb.Value_Int64Value{Int64Value: val},
}, nil
}
case schema_pb.ScalarType_FLOAT:
if val, err := strconv.ParseFloat(strings.TrimSpace(dataStr), 32); err == nil {
return &schema_pb.Value{
Kind: &schema_pb.Value_FloatValue{FloatValue: float32(val)},
}, nil
}
case schema_pb.ScalarType_DOUBLE:
if val, err := strconv.ParseFloat(strings.TrimSpace(dataStr), 64); err == nil {
return &schema_pb.Value{
Kind: &schema_pb.Value_DoubleValue{DoubleValue: val},
}, nil
}
case schema_pb.ScalarType_BOOL:
lowerStr := strings.ToLower(strings.TrimSpace(dataStr))
if lowerStr == "true" || lowerStr == "1" || lowerStr == "yes" {
return &schema_pb.Value{
Kind: &schema_pb.Value_BoolValue{BoolValue: true},
}, nil
} else if lowerStr == "false" || lowerStr == "0" || lowerStr == "no" {
return &schema_pb.Value{
Kind: &schema_pb.Value_BoolValue{BoolValue: false},
}, nil
}
case schema_pb.ScalarType_BYTES:
return &schema_pb.Value{
Kind: &schema_pb.Value_BytesValue{BytesValue: data},
}, nil
}
}
return nil, fmt.Errorf("unsupported type conversion for %v", fieldType)
}
// convertJSONValueToSchemaValue converts a JSON value to schema_pb.Value based on schema type
func (hms *HybridMessageScanner) convertJSONValueToSchemaValue(jsonValue interface{}, fieldType *schema_pb.Type) (*schema_pb.Value, error) {
switch fieldType.Kind.(type) {
case *schema_pb.Type_ScalarType:
scalarType := fieldType.GetScalarType()
switch scalarType {
case schema_pb.ScalarType_STRING:
if str, ok := jsonValue.(string); ok {
return &schema_pb.Value{
Kind: &schema_pb.Value_StringValue{StringValue: str},
}, nil
}
// Convert other types to string
return &schema_pb.Value{
Kind: &schema_pb.Value_StringValue{StringValue: fmt.Sprintf("%v", jsonValue)},
}, nil
case schema_pb.ScalarType_INT32:
if num, ok := jsonValue.(float64); ok { // JSON numbers are float64
return &schema_pb.Value{
Kind: &schema_pb.Value_Int32Value{Int32Value: int32(num)},
}, nil
}
case schema_pb.ScalarType_INT64:
if num, ok := jsonValue.(float64); ok {
return &schema_pb.Value{
Kind: &schema_pb.Value_Int64Value{Int64Value: int64(num)},
}, nil
}
case schema_pb.ScalarType_FLOAT:
if num, ok := jsonValue.(float64); ok {
return &schema_pb.Value{
Kind: &schema_pb.Value_FloatValue{FloatValue: float32(num)},
}, nil
}
case schema_pb.ScalarType_DOUBLE:
if num, ok := jsonValue.(float64); ok {
return &schema_pb.Value{
Kind: &schema_pb.Value_DoubleValue{DoubleValue: num},
}, nil
}
case schema_pb.ScalarType_BOOL:
if boolVal, ok := jsonValue.(bool); ok {
return &schema_pb.Value{
Kind: &schema_pb.Value_BoolValue{BoolValue: boolVal},
}, nil
}
case schema_pb.ScalarType_BYTES:
if str, ok := jsonValue.(string); ok {
return &schema_pb.Value{
Kind: &schema_pb.Value_BytesValue{BytesValue: []byte(str)},
}, nil
}
}
}
return nil, fmt.Errorf("incompatible JSON value type %T for schema type %v", jsonValue, fieldType)
}
// ConvertToSQLResult converts HybridScanResults to SQL query results
func (hms *HybridMessageScanner) ConvertToSQLResult(results []HybridScanResult, columns []string) *QueryResult {
if len(results) == 0 {
return &QueryResult{
Columns: columns,
Rows: [][]sqltypes.Value{},
Database: hms.topic.Namespace,
Table: hms.topic.Name,
}
}
// Determine columns if not specified
if len(columns) == 0 {
columnSet := make(map[string]bool)
for _, result := range results {
for columnName := range result.Values {
columnSet[columnName] = true
}
}
columns = make([]string, 0, len(columnSet))
for columnName := range columnSet {
columns = append(columns, columnName)
}
// If no data columns were found, include system columns so we have something to display
if len(columns) == 0 {
columns = []string{SW_DISPLAY_NAME_TIMESTAMP, SW_COLUMN_NAME_KEY}
}
}
// Convert to SQL rows
rows := make([][]sqltypes.Value, len(results))
for i, result := range results {
row := make([]sqltypes.Value, len(columns))
for j, columnName := range columns {
switch columnName {
case SW_COLUMN_NAME_SOURCE:
row[j] = sqltypes.NewVarChar(result.Source)
case SW_COLUMN_NAME_TIMESTAMP, SW_DISPLAY_NAME_TIMESTAMP:
// Format timestamp as proper timestamp type instead of raw nanoseconds
row[j] = hms.engine.formatTimestampColumn(result.Timestamp)
case SW_COLUMN_NAME_KEY:
row[j] = sqltypes.NewVarBinary(string(result.Key))
default:
if value, exists := result.Values[columnName]; exists {
row[j] = convertSchemaValueToSQL(value)
} else {
row[j] = sqltypes.NULL
}
}
}
rows[i] = row
}
return &QueryResult{
Columns: columns,
Rows: rows,
Database: hms.topic.Namespace,
Table: hms.topic.Name,
}
}
// ConvertToSQLResultWithMixedColumns handles SELECT *, specific_columns queries
// Combines auto-discovered columns (from *) with explicitly requested columns
func (hms *HybridMessageScanner) ConvertToSQLResultWithMixedColumns(results []HybridScanResult, explicitColumns []string) *QueryResult {
if len(results) == 0 {
// For empty results, combine auto-discovered columns with explicit ones
columnSet := make(map[string]bool)
// Add explicit columns first
for _, col := range explicitColumns {
columnSet[col] = true
}
// Build final column list
columns := make([]string, 0, len(columnSet))
for col := range columnSet {
columns = append(columns, col)
}
return &QueryResult{
Columns: columns,
Rows: [][]sqltypes.Value{},
Database: hms.topic.Namespace,
Table: hms.topic.Name,
}
}
// Auto-discover columns from data (like SELECT *)
autoColumns := make(map[string]bool)
for _, result := range results {
for columnName := range result.Values {
autoColumns[columnName] = true
}
}
// Combine auto-discovered and explicit columns
columnSet := make(map[string]bool)
// Add auto-discovered columns first (regular data columns)
for col := range autoColumns {
columnSet[col] = true
}
// Add explicit columns (may include system columns like _source)
for _, col := range explicitColumns {
columnSet[col] = true
}
// Build final column list
columns := make([]string, 0, len(columnSet))
for col := range columnSet {
columns = append(columns, col)
}
// If no data columns were found and no explicit columns specified, include system columns
if len(columns) == 0 {
columns = []string{SW_DISPLAY_NAME_TIMESTAMP, SW_COLUMN_NAME_KEY}
}
// Convert to SQL rows
rows := make([][]sqltypes.Value, len(results))
for i, result := range results {
row := make([]sqltypes.Value, len(columns))
for j, columnName := range columns {
switch columnName {
case SW_COLUMN_NAME_TIMESTAMP:
row[j] = sqltypes.NewInt64(result.Timestamp)
case SW_COLUMN_NAME_KEY:
row[j] = sqltypes.NewVarBinary(string(result.Key))
case SW_COLUMN_NAME_SOURCE:
row[j] = sqltypes.NewVarChar(result.Source)
default:
// Regular data column
if value, exists := result.Values[columnName]; exists {
row[j] = convertSchemaValueToSQL(value)
} else {
row[j] = sqltypes.NULL
}
}
}
rows[i] = row
}
return &QueryResult{
Columns: columns,
Rows: rows,
Database: hms.topic.Namespace,
Table: hms.topic.Name,
}
}
// ReadParquetStatistics efficiently reads column statistics from parquet files
// without scanning the full file content - uses parquet's built-in metadata
func (h *HybridMessageScanner) ReadParquetStatistics(partitionPath string) ([]*ParquetFileStats, error) {
var fileStats []*ParquetFileStats
// Use the same chunk cache as the logstore package
chunkCache := chunk_cache.NewChunkCacheInMemory(256)
lookupFileIdFn := filer.LookupFn(h.filerClient)
err := filer_pb.ReadDirAllEntries(context.Background(), h.filerClient, util.FullPath(partitionPath), "", func(entry *filer_pb.Entry, isLast bool) error {
// Only process parquet files
if entry.IsDirectory || !strings.HasSuffix(entry.Name, ".parquet") {
return nil
}
// Extract statistics from this parquet file
stats, err := h.extractParquetFileStats(entry, lookupFileIdFn, chunkCache)
if err != nil {
// Log error but continue processing other files
fmt.Printf("Warning: failed to extract stats from %s: %v\n", entry.Name, err)
return nil
}
if stats != nil {
fileStats = append(fileStats, stats)
}
return nil
})
return fileStats, err
}
// extractParquetFileStats extracts column statistics from a single parquet file
func (h *HybridMessageScanner) extractParquetFileStats(entry *filer_pb.Entry, lookupFileIdFn wdclient.LookupFileIdFunctionType, chunkCache *chunk_cache.ChunkCacheInMemory) (*ParquetFileStats, error) {
// Create reader for the parquet file
fileSize := filer.FileSize(entry)
visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(context.Background(), lookupFileIdFn, entry.Chunks, 0, int64(fileSize))
chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize))
readerCache := filer.NewReaderCache(32, chunkCache, lookupFileIdFn)
readerAt := filer.NewChunkReaderAtFromClient(context.Background(), readerCache, chunkViews, int64(fileSize), filer.DefaultPrefetchCount)
// Create parquet reader - this only reads metadata, not data
parquetReader := parquet.NewReader(readerAt)
defer parquetReader.Close()
fileView := parquetReader.File()
fileStats := &ParquetFileStats{
FileName: entry.Name,
RowCount: fileView.NumRows(),
ColumnStats: make(map[string]*ParquetColumnStats),
}
// Populate optional min/max from filer extended attributes (writer stores ns timestamps)
if entry != nil && entry.Extended != nil {
if minBytes, ok := entry.Extended[mq.ExtendedAttrTimestampMin]; ok && len(minBytes) == 8 {
fileStats.MinTimestampNs = int64(binary.BigEndian.Uint64(minBytes))
}
if maxBytes, ok := entry.Extended[mq.ExtendedAttrTimestampMax]; ok && len(maxBytes) == 8 {
fileStats.MaxTimestampNs = int64(binary.BigEndian.Uint64(maxBytes))
}
}
// Get schema information
schema := fileView.Schema()
// Process each row group
rowGroups := fileView.RowGroups()
for _, rowGroup := range rowGroups {
columnChunks := rowGroup.ColumnChunks()
// Process each column chunk
for i, chunk := range columnChunks {
// Get column name from schema
columnName := h.getColumnNameFromSchema(schema, i)
if columnName == "" {
continue
}
// Try to get column statistics
columnIndex, err := chunk.ColumnIndex()
if err != nil {
// No column index available - skip this column
continue
}
// Extract min/max values from the first page (for simplicity)
// In a more sophisticated implementation, we could aggregate across all pages
numPages := columnIndex.NumPages()
if numPages == 0 {
continue
}
minParquetValue := columnIndex.MinValue(0)
maxParquetValue := columnIndex.MaxValue(numPages - 1)
nullCount := int64(0)
// Aggregate null counts across all pages
for pageIdx := 0; pageIdx < numPages; pageIdx++ {
nullCount += columnIndex.NullCount(pageIdx)
}
// Convert parquet values to schema_pb.Value
minValue, err := h.convertParquetValueToSchemaValue(minParquetValue)
if err != nil {
continue
}
maxValue, err := h.convertParquetValueToSchemaValue(maxParquetValue)
if err != nil {
continue
}
// Store column statistics (aggregate across row groups if column already exists)
if existingStats, exists := fileStats.ColumnStats[columnName]; exists {
// Update existing statistics
if h.compareSchemaValues(minValue, existingStats.MinValue) < 0 {
existingStats.MinValue = minValue
}
if h.compareSchemaValues(maxValue, existingStats.MaxValue) > 0 {
existingStats.MaxValue = maxValue
}
existingStats.NullCount += nullCount
} else {
// Create new column statistics
fileStats.ColumnStats[columnName] = &ParquetColumnStats{
ColumnName: columnName,
MinValue: minValue,
MaxValue: maxValue,
NullCount: nullCount,
RowCount: rowGroup.NumRows(),
}
}
}
}
return fileStats, nil
}
// getColumnNameFromSchema extracts column name from parquet schema by index
func (h *HybridMessageScanner) getColumnNameFromSchema(schema *parquet.Schema, columnIndex int) string {
// Get the leaf columns in order
var columnNames []string
h.collectColumnNames(schema.Fields(), &columnNames)
if columnIndex >= 0 && columnIndex < len(columnNames) {
return columnNames[columnIndex]
}
return ""
}
// collectColumnNames recursively collects leaf column names from schema
func (h *HybridMessageScanner) collectColumnNames(fields []parquet.Field, names *[]string) {
for _, field := range fields {
if len(field.Fields()) == 0 {
// This is a leaf field (no sub-fields)
*names = append(*names, field.Name())
} else {
// This is a group - recurse
h.collectColumnNames(field.Fields(), names)
}
}
}
// convertParquetValueToSchemaValue converts parquet.Value to schema_pb.Value
func (h *HybridMessageScanner) convertParquetValueToSchemaValue(pv parquet.Value) (*schema_pb.Value, error) {
switch pv.Kind() {
case parquet.Boolean:
return &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: pv.Boolean()}}, nil
case parquet.Int32:
return &schema_pb.Value{Kind: &schema_pb.Value_Int32Value{Int32Value: pv.Int32()}}, nil
case parquet.Int64:
return &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: pv.Int64()}}, nil
case parquet.Float:
return &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: pv.Float()}}, nil
case parquet.Double:
return &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: pv.Double()}}, nil
case parquet.ByteArray:
return &schema_pb.Value{Kind: &schema_pb.Value_BytesValue{BytesValue: pv.ByteArray()}}, nil
default:
return nil, fmt.Errorf("unsupported parquet value kind: %v", pv.Kind())
}
}
// compareSchemaValues compares two schema_pb.Value objects
func (h *HybridMessageScanner) compareSchemaValues(v1, v2 *schema_pb.Value) int {
if v1 == nil && v2 == nil {
return 0
}
if v1 == nil {
return -1
}
if v2 == nil {
return 1
}
// Extract raw values and compare
raw1 := h.extractRawValueFromSchema(v1)
raw2 := h.extractRawValueFromSchema(v2)
return h.compareRawValues(raw1, raw2)
}
// extractRawValueFromSchema extracts the raw value from schema_pb.Value
func (h *HybridMessageScanner) extractRawValueFromSchema(value *schema_pb.Value) interface{} {
switch v := value.Kind.(type) {
case *schema_pb.Value_BoolValue:
return v.BoolValue
case *schema_pb.Value_Int32Value:
return v.Int32Value
case *schema_pb.Value_Int64Value:
return v.Int64Value
case *schema_pb.Value_FloatValue:
return v.FloatValue
case *schema_pb.Value_DoubleValue:
return v.DoubleValue
case *schema_pb.Value_BytesValue:
return string(v.BytesValue) // Convert to string for comparison
case *schema_pb.Value_StringValue:
return v.StringValue
}
return nil
}
// compareRawValues compares two raw values
func (h *HybridMessageScanner) compareRawValues(v1, v2 interface{}) int {
// Handle nil cases
if v1 == nil && v2 == nil {
return 0
}
if v1 == nil {
return -1
}
if v2 == nil {
return 1
}
// Compare based on type
switch val1 := v1.(type) {
case bool:
if val2, ok := v2.(bool); ok {
if val1 == val2 {
return 0
}
if val1 {
return 1
}
return -1
}
case int32:
if val2, ok := v2.(int32); ok {
if val1 < val2 {
return -1
} else if val1 > val2 {
return 1
}
return 0
}
case int64:
if val2, ok := v2.(int64); ok {
if val1 < val2 {
return -1
} else if val1 > val2 {
return 1
}
return 0
}
case float32:
if val2, ok := v2.(float32); ok {
if val1 < val2 {
return -1
} else if val1 > val2 {
return 1
}
return 0
}
case float64:
if val2, ok := v2.(float64); ok {
if val1 < val2 {
return -1
} else if val1 > val2 {
return 1
}
return 0
}
case string:
if val2, ok := v2.(string); ok {
if val1 < val2 {
return -1
} else if val1 > val2 {
return 1
}
return 0
}
}
// Default: try string comparison
str1 := fmt.Sprintf("%v", v1)
str2 := fmt.Sprintf("%v", v2)
if str1 < str2 {
return -1
} else if str1 > str2 {
return 1
}
return 0
}
// streamingMerge merges multiple sorted data sources using a heap-based approach
// This provides memory-efficient merging without loading all data into memory
func (hms *HybridMessageScanner) streamingMerge(dataSources []StreamingDataSource, limit int) ([]HybridScanResult, error) {
if len(dataSources) == 0 {
return nil, nil
}
var results []HybridScanResult
mergeHeap := &StreamingMergeHeap{}
heap.Init(mergeHeap)
// Initialize heap with first item from each data source
for i, source := range dataSources {
if source.HasMore() {
result, err := source.Next()
if err != nil {
// Close all sources and return error
for _, s := range dataSources {
s.Close()
}
return nil, fmt.Errorf("failed to read from data source %d: %v", i, err)
}
if result != nil {
heap.Push(mergeHeap, &StreamingMergeItem{
Result: result,
SourceID: i,
DataSource: source,
})
}
}
}
// Process results in chronological order
for mergeHeap.Len() > 0 {
// Get next chronologically ordered result
item := heap.Pop(mergeHeap).(*StreamingMergeItem)
results = append(results, *item.Result)
// Check limit
if limit > 0 && len(results) >= limit {
break
}
// Try to get next item from the same data source
if item.DataSource.HasMore() {
nextResult, err := item.DataSource.Next()
if err != nil {
// Log error but continue with other sources
fmt.Printf("Warning: Error reading next item from source %d: %v\n", item.SourceID, err)
} else if nextResult != nil {
heap.Push(mergeHeap, &StreamingMergeItem{
Result: nextResult,
SourceID: item.SourceID,
DataSource: item.DataSource,
})
}
}
}
// Close all data sources
for _, source := range dataSources {
source.Close()
}
return results, nil
}
// SliceDataSource wraps a pre-loaded slice of results as a StreamingDataSource
// This is used for unflushed data that is already loaded into memory
type SliceDataSource struct {
results []HybridScanResult
index int
}
func NewSliceDataSource(results []HybridScanResult) *SliceDataSource {
return &SliceDataSource{
results: results,
index: 0,
}
}
func (s *SliceDataSource) Next() (*HybridScanResult, error) {
if s.index >= len(s.results) {
return nil, nil
}
result := &s.results[s.index]
s.index++
return result, nil
}
func (s *SliceDataSource) HasMore() bool {
return s.index < len(s.results)
}
func (s *SliceDataSource) Close() error {
return nil // Nothing to clean up for slice-based source
}
// StreamingFlushedDataSource provides streaming access to flushed data
type StreamingFlushedDataSource struct {
hms *HybridMessageScanner
partition topic.Partition
options HybridScanOptions
mergedReadFn func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error)
resultChan chan *HybridScanResult
errorChan chan error
doneChan chan struct{}
started bool
finished bool
closed int32 // atomic flag to prevent double close
mu sync.RWMutex
}
func NewStreamingFlushedDataSource(hms *HybridMessageScanner, partition topic.Partition, options HybridScanOptions) *StreamingFlushedDataSource {
mergedReadFn := logstore.GenMergedReadFunc(hms.filerClient, hms.topic, partition)
return &StreamingFlushedDataSource{
hms: hms,
partition: partition,
options: options,
mergedReadFn: mergedReadFn,
resultChan: make(chan *HybridScanResult, 100), // Buffer for better performance
errorChan: make(chan error, 1),
doneChan: make(chan struct{}),
started: false,
finished: false,
}
}
func (s *StreamingFlushedDataSource) startStreaming() {
if s.started {
return
}
s.started = true
go func() {
defer func() {
// Use atomic flag to ensure channels are only closed once
if atomic.CompareAndSwapInt32(&s.closed, 0, 1) {
close(s.resultChan)
close(s.errorChan)
close(s.doneChan)
}
}()
// Set up time range for scanning
startTime := time.Unix(0, s.options.StartTimeNs)
if s.options.StartTimeNs == 0 {
startTime = time.Unix(0, 0)
}
stopTsNs := s.options.StopTimeNs
// For SQL queries, stopTsNs = 0 means "no stop time restriction"
// This is different from message queue consumers which want to stop at "now"
// We detect SQL context by checking if we have a predicate function
if stopTsNs == 0 && s.options.Predicate == nil {
// Only set to current time for non-SQL queries (message queue consumers)
stopTsNs = time.Now().UnixNano()
}
// If stopTsNs is still 0, it means this is a SQL query that wants unrestricted scanning
// Message processing function
eachLogEntryFn := func(logEntry *filer_pb.LogEntry) (isDone bool, err error) {
// Pre-decode DataMessage for reuse in both control check and conversion
var dataMessage *mq_pb.DataMessage
if len(logEntry.Data) > 0 {
dataMessage = &mq_pb.DataMessage{}
if err := proto.Unmarshal(logEntry.Data, dataMessage); err != nil {
dataMessage = nil // Failed to decode, treat as raw data
}
}
// Skip control entries without actual data
if s.hms.isControlEntryWithDecoded(logEntry, dataMessage) {
return false, nil // Skip this entry
}
// Convert log entry to schema_pb.RecordValue for consistent processing
recordValue, source, convertErr := s.hms.convertLogEntryToRecordValueWithDecoded(logEntry, dataMessage)
if convertErr != nil {
return false, fmt.Errorf("failed to convert log entry: %v", convertErr)
}
// Apply predicate filtering (WHERE clause)
if s.options.Predicate != nil && !s.options.Predicate(recordValue) {
return false, nil // Skip this message
}
// Extract system columns
timestamp := recordValue.Fields[SW_COLUMN_NAME_TIMESTAMP].GetInt64Value()
key := recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue()
// Apply column projection
values := make(map[string]*schema_pb.Value)
if len(s.options.Columns) == 0 {
// Select all columns (excluding system columns from user view)
for name, value := range recordValue.Fields {
if name != SW_COLUMN_NAME_TIMESTAMP && name != SW_COLUMN_NAME_KEY {
values[name] = value
}
}
} else {
// Select specified columns only
for _, columnName := range s.options.Columns {
if value, exists := recordValue.Fields[columnName]; exists {
values[columnName] = value
}
}
}
result := &HybridScanResult{
Values: values,
Timestamp: timestamp,
Key: key,
Source: source,
}
// Check if already closed before trying to send
if atomic.LoadInt32(&s.closed) != 0 {
return true, nil // Stop processing if closed
}
// Send result to channel with proper handling of closed channels
select {
case s.resultChan <- result:
return false, nil
case <-s.doneChan:
return true, nil // Stop processing if closed
default:
// Check again if closed (in case it was closed between the atomic check and select)
if atomic.LoadInt32(&s.closed) != 0 {
return true, nil
}
// If not closed, try sending again with blocking select
select {
case s.resultChan <- result:
return false, nil
case <-s.doneChan:
return true, nil
}
}
}
// Start scanning from the specified position
startPosition := log_buffer.MessagePosition{Time: startTime}
_, _, err := s.mergedReadFn(startPosition, stopTsNs, eachLogEntryFn)
if err != nil {
// Only try to send error if not already closed
if atomic.LoadInt32(&s.closed) == 0 {
select {
case s.errorChan <- fmt.Errorf("flushed data scan failed: %v", err):
case <-s.doneChan:
default:
// Channel might be full or closed, ignore
}
}
}
s.finished = true
}()
}
func (s *StreamingFlushedDataSource) Next() (*HybridScanResult, error) {
if !s.started {
s.startStreaming()
}
select {
case result, ok := <-s.resultChan:
if !ok {
return nil, nil // No more results
}
return result, nil
case err := <-s.errorChan:
return nil, err
case <-s.doneChan:
return nil, nil
}
}
func (s *StreamingFlushedDataSource) HasMore() bool {
if !s.started {
return true // Haven't started yet, so potentially has data
}
return !s.finished || len(s.resultChan) > 0
}
func (s *StreamingFlushedDataSource) Close() error {
// Use atomic flag to ensure channels are only closed once
if atomic.CompareAndSwapInt32(&s.closed, 0, 1) {
close(s.doneChan)
close(s.resultChan)
close(s.errorChan)
}
return nil
}
// mergeSort efficiently sorts HybridScanResult slice by timestamp using merge sort algorithm
func (hms *HybridMessageScanner) mergeSort(results []HybridScanResult, left, right int) {
if left < right {
mid := left + (right-left)/2
// Recursively sort both halves
hms.mergeSort(results, left, mid)
hms.mergeSort(results, mid+1, right)
// Merge the sorted halves
hms.merge(results, left, mid, right)
}
}
// merge combines two sorted subarrays into a single sorted array
func (hms *HybridMessageScanner) merge(results []HybridScanResult, left, mid, right int) {
// Create temporary arrays for the two subarrays
leftArray := make([]HybridScanResult, mid-left+1)
rightArray := make([]HybridScanResult, right-mid)
// Copy data to temporary arrays
copy(leftArray, results[left:mid+1])
copy(rightArray, results[mid+1:right+1])
// Merge the temporary arrays back into results[left..right]
i, j, k := 0, 0, left
for i < len(leftArray) && j < len(rightArray) {
if leftArray[i].Timestamp <= rightArray[j].Timestamp {
results[k] = leftArray[i]
i++
} else {
results[k] = rightArray[j]
j++
}
k++
}
// Copy remaining elements of leftArray, if any
for i < len(leftArray) {
results[k] = leftArray[i]
i++
k++
}
// Copy remaining elements of rightArray, if any
for j < len(rightArray) {
results[k] = rightArray[j]
j++
k++
}
}