fix(kafka): resolve consumer group resumption timeout in e2e tests (#8935)

* fix(kafka): resolve consumer group resumption timeout in e2e tests

Three issues caused ConsumerGroupResumption to time out when the second
consumer tried to resume from committed offsets:

1. ForceCompleteRebalance deadlock: performCleanup() held group.Mu.Lock
   then called ForceCompleteRebalance() which tried to acquire the same
   lock — a guaranteed deadlock on Go's non-reentrant sync.Mutex. Fixed
   by requiring callers to hold the lock (matching actual call sites).

2. Unbounded fallback fetch: when the multi-batch fetch timed out, the
   fallback GetStoredRecords call used the connection context (no
   deadline). A slow broker gRPC call could block the data-plane
   goroutine indefinitely, causing head-of-line blocking for all
   responses on that connection. Fixed with a 10-second timeout.

3. HWM lookup failure caused empty responses: after a consumer leaves
   and the partition is deactivated, GetLatestOffset can fail. The
   fetch handler treated this as "no data" and entered the long-poll
   loop (up to 10s × 4 retries = 40s timeout). Fixed by assuming data
   may exist when HWM lookup fails, so the actual fetch determines
   availability.

* fix(kafka): address review feedback on HWM sentinel and fallback timeout

- Don't expose synthetic HWM (requestedOffset+1) to clients; keep
  result.highWaterMark at 0 when the real HWM lookup fails.
- Tie fallback timeout to client's MaxWaitTime instead of a fixed 10s,
  so one slow partition doesn't hold the reader beyond the request budget.

* fix(kafka): use large HWM sentinel and clamp fallback timeout

- Use requestedOffset+10000 as sentinel HWM instead of +1, so
  FetchMultipleBatches doesn't artificially limit to 1 record.
- Add 2s floor to fallback timeout so disk reads via gRPC have
  a reasonable chance even when maxWaitMs is small or zero.

* fix(kafka): use MaxInt64 sentinel and derive HWM from fetch result

- Use math.MaxInt64 as HWM sentinel to avoid integer overflow risk
  (previously requestedOffset+10000 could wrap on large offsets).
- After the fetch, derive a meaningful HWM from newOffset so the
  client never sees MaxInt64 or 0 in the response.

* fix(kafka): use remaining time budget for fallback fetch

The fallback was restarting the full maxWaitMs budget even though the
multi-batch fetch already consumed part of it. Now compute remaining
time from either the parent context deadline or maxWaitMs minus
elapsed, skip the fallback if budget is exhausted, and clamp to
[2s, 10s] bounds.
This commit is contained in:
Chris Lu
2026-04-05 20:13:57 -07:00
committed by GitHub
parent 72eb93919c
commit 7ab6306e15
4 changed files with 62 additions and 20 deletions

View File

@@ -116,11 +116,9 @@ func (rtm *RebalanceTimeoutManager) IsRebalanceStuck(group *ConsumerGroup, maxRe
return time.Since(group.LastActivity) > maxRebalanceDuration return time.Since(group.LastActivity) > maxRebalanceDuration
} }
// ForceCompleteRebalance forces completion of a stuck rebalance // ForceCompleteRebalance forces completion of a stuck rebalance.
// IMPORTANT: The caller must already hold group.Mu.Lock().
func (rtm *RebalanceTimeoutManager) ForceCompleteRebalance(group *ConsumerGroup) { func (rtm *RebalanceTimeoutManager) ForceCompleteRebalance(group *ConsumerGroup) {
group.Mu.Lock()
defer group.Mu.Unlock()
// If stuck in preparing rebalance, move to completing // If stuck in preparing rebalance, move to completing
if group.State == GroupStatePreparingRebalance { if group.State == GroupStatePreparingRebalance {
group.State = GroupStateCompletingRebalance group.State = GroupStateCompletingRebalance

View File

@@ -185,20 +185,19 @@ func TestRebalanceTimeoutManager_ForceCompleteRebalance(t *testing.T) {
State: MemberStatePending, State: MemberStatePending,
} }
group.Members["member1"] = member group.Members["member1"] = member
group.Mu.Unlock()
// ForceCompleteRebalance expects the caller to hold group.Mu.Lock()
rtm.ForceCompleteRebalance(group) rtm.ForceCompleteRebalance(group)
group.Mu.RLock()
if group.State != GroupStateCompletingRebalance { if group.State != GroupStateCompletingRebalance {
t.Errorf("Expected group state to be CompletingRebalance, got %s", group.State.String()) t.Errorf("Expected group state to be CompletingRebalance, got %s", group.State.String())
} }
group.Mu.RUnlock() group.Mu.Unlock()
// Test forcing completion from CompletingRebalance // Test forcing completion from CompletingRebalance
group.Mu.Lock()
rtm.ForceCompleteRebalance(group) rtm.ForceCompleteRebalance(group)
group.Mu.RLock()
if group.State != GroupStateStable { if group.State != GroupStateStable {
t.Errorf("Expected group state to be Stable, got %s", group.State.String()) t.Errorf("Expected group state to be Stable, got %s", group.State.String())
} }
@@ -206,7 +205,7 @@ func TestRebalanceTimeoutManager_ForceCompleteRebalance(t *testing.T) {
if member.State != MemberStateStable { if member.State != MemberStateStable {
t.Errorf("Expected member state to be Stable, got %s", member.State.String()) t.Errorf("Expected member state to be Stable, got %s", member.State.String())
} }
group.Mu.RUnlock() group.Mu.Unlock()
} }
func TestRebalanceTimeoutManager_GetRebalanceStatus(t *testing.T) { func TestRebalanceTimeoutManager_GetRebalanceStatus(t *testing.T) {

View File

@@ -55,7 +55,10 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers
for _, partition := range topic.Partitions { for _, partition := range topic.Partitions {
hwm, err := h.seaweedMQHandler.GetLatestOffset(topic.Name, partition.PartitionID) hwm, err := h.seaweedMQHandler.GetLatestOffset(topic.Name, partition.PartitionID)
if err != nil { if err != nil {
continue // HWM lookup failed (e.g. partition deactivated between consumer
// sessions). Assume data may be available rather than blocking in
// the long-poll loop — the actual fetch will determine the truth.
return true
} }
// Normalize fetch offset // Normalize fetch offset
effectiveOffset := partition.FetchOffset effectiveOffset := partition.FetchOffset

View File

@@ -2,6 +2,7 @@ package protocol
import ( import (
"context" "context"
"math"
"sync" "sync"
"time" "time"
@@ -136,13 +137,19 @@ func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partition
}() }()
// Get high water mark // Get high water mark
hwmUnknown := false
hwm, hwmErr := pr.handler.seaweedMQHandler.GetLatestOffset(pr.topicName, pr.partitionID) hwm, hwmErr := pr.handler.seaweedMQHandler.GetLatestOffset(pr.topicName, pr.partitionID)
if hwmErr != nil { if hwmErr != nil {
glog.Errorf("[%s] CRITICAL: Failed to get HWM for %s[%d]: %v", // HWM lookup can fail when the partition has been deactivated between consumer
// sessions. Proceed with the fetch anyway — the broker will return the correct
// data (or empty) based on its own state. Use math.MaxInt64 as sentinel so
// FetchMultipleBatches doesn't artificially cap recordsAvailable, and we
// don't hit the early-return below. The actual HWM will be derived from
// the fetch result (newOffset) after the read.
glog.Warningf("[%s] HWM lookup failed for %s[%d]: %v — will attempt fetch anyway",
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, hwmErr) pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, hwmErr)
result.recordBatch = []byte{} hwm = math.MaxInt64
result.highWaterMark = 0 hwmUnknown = true
return
} }
result.highWaterMark = hwm result.highWaterMark = hwm
@@ -170,10 +177,20 @@ func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partition
// Fetch on-demand - no pre-fetching to avoid overwhelming the broker // Fetch on-demand - no pre-fetching to avoid overwhelming the broker
recordBatch, newOffset := pr.readRecords(ctx, req.requestedOffset, req.maxBytes, req.maxWaitMs, hwm) recordBatch, newOffset := pr.readRecords(ctx, req.requestedOffset, req.maxBytes, req.maxWaitMs, hwm)
// When HWM was unknown, derive a reasonable value from the fetch result
// so the client sees a meaningful high water mark instead of MaxInt64.
if hwmUnknown {
if newOffset > req.requestedOffset {
result.highWaterMark = newOffset // best estimate: end of what we read
} else {
result.highWaterMark = req.requestedOffset // no data found
}
}
// Log what we got back - DETAILED for diagnostics // Log what we got back - DETAILED for diagnostics
if len(recordBatch) == 0 { if len(recordBatch) == 0 {
glog.V(2).Infof("[%s] FETCH %s[%d]: readRecords returned EMPTY (offset=%d, hwm=%d)", glog.V(2).Infof("[%s] FETCH %s[%d]: readRecords returned EMPTY (offset=%d, hwm=%d)",
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, hwm) pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, result.highWaterMark)
result.recordBatch = []byte{} result.recordBatch = []byte{}
} else { } else {
result.recordBatch = recordBatch result.recordBatch = recordBatch
@@ -228,15 +245,40 @@ func (pr *partitionReader) readRecords(ctx context.Context, fromOffset int64, ma
return fetchResult.RecordBatches, fetchResult.NextOffset return fetchResult.RecordBatches, fetchResult.NextOffset
} }
// Multi-batch failed - try single batch WITHOUT the timeout constraint // Multi-batch failed - try single batch with a fresh timeout
// to ensure we get at least some data even if multi-batch timed out
glog.Warningf("[%s] Multi-batch fetch failed for %s[%d] offset=%d after %v, falling back to single-batch (err: %v)", glog.Warningf("[%s] Multi-batch fetch failed for %s[%d] offset=%d after %v, falling back to single-batch (err: %v)",
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, fetchDuration, err) pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, fetchDuration, err)
// Use original context for fallback, NOT the timed-out fetchCtx // Compute the remaining time budget for the fallback. If the parent
// This ensures the fallback has a fresh chance to fetch data // context carries a deadline, honour it; otherwise derive remaining
// from maxWaitMs minus elapsed time. This prevents the fallback from
// restarting the full budget after the multi-batch fetch already
// consumed part of it.
var remaining time.Duration
if deadline, ok := ctx.Deadline(); ok {
remaining = time.Until(deadline)
} else {
remaining = time.Duration(maxWaitMs)*time.Millisecond - time.Since(fetchStartTime)
}
if remaining <= 0 {
// Budget exhausted — skip the fallback entirely.
glog.V(2).Infof("[%s] No remaining budget for fallback on %s[%d] (maxWait=%dms, elapsed=%v)",
pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, maxWaitMs, time.Since(fetchStartTime))
return []byte{}, fromOffset
}
// Clamp: floor of 2s so disk reads via gRPC have a realistic chance,
// but never exceed 10s to bound data-plane blocking.
fallbackTimeout := remaining
if fallbackTimeout < 2*time.Second {
fallbackTimeout = 2 * time.Second
}
if fallbackTimeout > 10*time.Second {
fallbackTimeout = 10 * time.Second
}
fallbackCtx, fallbackCancel := context.WithTimeout(ctx, fallbackTimeout)
defer fallbackCancel()
fallbackStartTime := time.Now() fallbackStartTime := time.Now()
smqRecords, err := pr.handler.seaweedMQHandler.GetStoredRecords(ctx, pr.topicName, pr.partitionID, fromOffset, 10) smqRecords, err := pr.handler.seaweedMQHandler.GetStoredRecords(fallbackCtx, pr.topicName, pr.partitionID, fromOffset, 10)
fallbackDuration := time.Since(fallbackStartTime) fallbackDuration := time.Since(fallbackStartTime)
if fallbackDuration > 2*time.Second { if fallbackDuration > 2*time.Second {