Files
Chris Lu 7ab6306e15 fix(kafka): resolve consumer group resumption timeout in e2e tests (#8935)
* fix(kafka): resolve consumer group resumption timeout in e2e tests

Three issues caused ConsumerGroupResumption to time out when the second
consumer tried to resume from committed offsets:

1. ForceCompleteRebalance deadlock: performCleanup() held group.Mu.Lock
   then called ForceCompleteRebalance() which tried to acquire the same
   lock — a guaranteed deadlock on Go's non-reentrant sync.Mutex. Fixed
   by requiring callers to hold the lock (matching actual call sites).

2. Unbounded fallback fetch: when the multi-batch fetch timed out, the
   fallback GetStoredRecords call used the connection context (no
   deadline). A slow broker gRPC call could block the data-plane
   goroutine indefinitely, causing head-of-line blocking for all
   responses on that connection. Fixed with a 10-second timeout.

3. HWM lookup failure caused empty responses: after a consumer leaves
   and the partition is deactivated, GetLatestOffset can fail. The
   fetch handler treated this as "no data" and entered the long-poll
   loop (up to 10s × 4 retries = 40s timeout). Fixed by assuming data
   may exist when HWM lookup fails, so the actual fetch determines
   availability.

* fix(kafka): address review feedback on HWM sentinel and fallback timeout

- Don't expose synthetic HWM (requestedOffset+1) to clients; keep
  result.highWaterMark at 0 when the real HWM lookup fails.
- Tie fallback timeout to client's MaxWaitTime instead of a fixed 10s,
  so one slow partition doesn't hold the reader beyond the request budget.

* fix(kafka): use large HWM sentinel and clamp fallback timeout

- Use requestedOffset+10000 as sentinel HWM instead of +1, so
  FetchMultipleBatches doesn't artificially limit to 1 record.
- Add 2s floor to fallback timeout so disk reads via gRPC have
  a reasonable chance even when maxWaitMs is small or zero.

* fix(kafka): use MaxInt64 sentinel and derive HWM from fetch result

- Use math.MaxInt64 as HWM sentinel to avoid integer overflow risk
  (previously requestedOffset+10000 could wrap on large offsets).
- After the fetch, derive a meaningful HWM from newOffset so the
  client never sees MaxInt64 or 0 in the response.

* fix(kafka): use remaining time budget for fallback fetch

The fallback was restarting the full maxWaitMs budget even though the
multi-batch fetch already consumed part of it. Now compute remaining
time from either the parent context deadline or maxWaitMs minus
elapsed, skip the fallback if budget is exhausted, and clamp to
[2s, 10s] bounds.
2026-04-05 20:13:57 -07:00
..
2025-10-27 23:04:55 -07:00
2022-07-31 13:23:44 -07:00

SeaweedMQ Message Queue on SeaweedFS (WIP, not ready)

What are the use cases it is designed for?

Message queues are like water pipes. Messages flow in the pipes to their destinations.

However, what if a flood comes? Of course, you can increase the number of partitions, add more brokers, restart, and watch the traffic level closely.

Sometimes the flood is expected. For example, backfill some old data in batch, and switch to online messages. You may want to ensure enough brokers to handle the data and reduce them later to cut cost.

SeaweedMQ is designed for use cases that need to:

  • Receive and save large number of messages.
  • Handle spike traffic automatically.

What is special about SeaweedMQ?

  • Separate computation and storage nodes to scale independently.
    • Unlimited storage space by adding volume servers.
    • Unlimited message brokers to handle incoming messages.
    • Offline messages can be operated as normal files.
  • Scale up and down with auto split and merge message topics.
    • Topics can automatically split into segments when traffic increases, and vice verse.
  • Pass messages by reference instead of copying.
    • Clients can optionally upload the messages first and just submit the references.
    • Drastically reduce the broker load.
  • Stateless brokers
    • All brokers are equal. One broker is dynamically picked as the leader.
    • Add brokers at any time.
    • Allow rolling restart brokers or remove brokers at a pace.

Design

How it works?

Brokers are just computation nodes without storage. When a broker starts, it reports itself to masters. Among all the brokers, one of them will be selected as the leader by the masters.

A topic needs to define its partition key on its messages.

Messages for a topic are divided into segments. One segment can cover a range of partitions. A segment can be split into 2 segments, or 2 neighboring segments can be merged back to one segment.

During write time, the client will ask the broker leader for a few brokers to process the segment.

The broker leader will check whether the segment already has assigned the brokers. If not, select a few brokers based on their loads, save the selection into filer, and tell the client.

The client will write the messages for this segment to the selected brokers.

Failover

The broker leader does not contain any state. If it fails, the masters will select a different broker.

For a segment, if any one of the selected brokers is down, the remaining brokers should try to write received messages to the filer, and close the segment to the clients.

Then the clients should start a new segment. The masters should assign other healthy brokers to handle the new segment.

So any brokers can go down without losing data.

Auto Split or Merge

(The idea is learned from Pravega.)

The brokers should report its traffic load to the broker leader periodically.

If any segment has too much load, the broker leader will ask the brokers to tell the client to close current one and create two new segments.

If 2 neighboring segments have the combined load below average load per segment, the broker leader will ask the brokers to tell the client to close this 2 segments and create a new segment.