* fix(kafka): resolve consumer group resumption timeout in e2e tests
Three issues caused ConsumerGroupResumption to time out when the second
consumer tried to resume from committed offsets:
1. ForceCompleteRebalance deadlock: performCleanup() held group.Mu.Lock
then called ForceCompleteRebalance() which tried to acquire the same
lock — a guaranteed deadlock on Go's non-reentrant sync.Mutex. Fixed
by requiring callers to hold the lock (matching actual call sites).
2. Unbounded fallback fetch: when the multi-batch fetch timed out, the
fallback GetStoredRecords call used the connection context (no
deadline). A slow broker gRPC call could block the data-plane
goroutine indefinitely, causing head-of-line blocking for all
responses on that connection. Fixed with a 10-second timeout.
3. HWM lookup failure caused empty responses: after a consumer leaves
and the partition is deactivated, GetLatestOffset can fail. The
fetch handler treated this as "no data" and entered the long-poll
loop (up to 10s × 4 retries = 40s timeout). Fixed by assuming data
may exist when HWM lookup fails, so the actual fetch determines
availability.
* fix(kafka): address review feedback on HWM sentinel and fallback timeout
- Don't expose synthetic HWM (requestedOffset+1) to clients; keep
result.highWaterMark at 0 when the real HWM lookup fails.
- Tie fallback timeout to client's MaxWaitTime instead of a fixed 10s,
so one slow partition doesn't hold the reader beyond the request budget.
* fix(kafka): use large HWM sentinel and clamp fallback timeout
- Use requestedOffset+10000 as sentinel HWM instead of +1, so
FetchMultipleBatches doesn't artificially limit to 1 record.
- Add 2s floor to fallback timeout so disk reads via gRPC have
a reasonable chance even when maxWaitMs is small or zero.
* fix(kafka): use MaxInt64 sentinel and derive HWM from fetch result
- Use math.MaxInt64 as HWM sentinel to avoid integer overflow risk
(previously requestedOffset+10000 could wrap on large offsets).
- After the fetch, derive a meaningful HWM from newOffset so the
client never sees MaxInt64 or 0 in the response.
* fix(kafka): use remaining time budget for fallback fetch
The fallback was restarting the full maxWaitMs budget even though the
multi-batch fetch already consumed part of it. Now compute remaining
time from either the parent context deadline or maxWaitMs minus
elapsed, skip the fallback if budget is exhausted, and clamp to
[2s, 10s] bounds.