diff --git a/weed/mq/kafka/consumer/rebalance_timeout.go b/weed/mq/kafka/consumer/rebalance_timeout.go index f4f65f37b..ce8d95d84 100644 --- a/weed/mq/kafka/consumer/rebalance_timeout.go +++ b/weed/mq/kafka/consumer/rebalance_timeout.go @@ -116,11 +116,9 @@ func (rtm *RebalanceTimeoutManager) IsRebalanceStuck(group *ConsumerGroup, maxRe return time.Since(group.LastActivity) > maxRebalanceDuration } -// ForceCompleteRebalance forces completion of a stuck rebalance +// ForceCompleteRebalance forces completion of a stuck rebalance. +// IMPORTANT: The caller must already hold group.Mu.Lock(). func (rtm *RebalanceTimeoutManager) ForceCompleteRebalance(group *ConsumerGroup) { - group.Mu.Lock() - defer group.Mu.Unlock() - // If stuck in preparing rebalance, move to completing if group.State == GroupStatePreparingRebalance { group.State = GroupStateCompletingRebalance diff --git a/weed/mq/kafka/consumer/rebalance_timeout_test.go b/weed/mq/kafka/consumer/rebalance_timeout_test.go index 61dbf3fc5..8278d1972 100644 --- a/weed/mq/kafka/consumer/rebalance_timeout_test.go +++ b/weed/mq/kafka/consumer/rebalance_timeout_test.go @@ -185,20 +185,19 @@ func TestRebalanceTimeoutManager_ForceCompleteRebalance(t *testing.T) { State: MemberStatePending, } group.Members["member1"] = member - group.Mu.Unlock() + // ForceCompleteRebalance expects the caller to hold group.Mu.Lock() rtm.ForceCompleteRebalance(group) - group.Mu.RLock() if group.State != GroupStateCompletingRebalance { t.Errorf("Expected group state to be CompletingRebalance, got %s", group.State.String()) } - group.Mu.RUnlock() + group.Mu.Unlock() // Test forcing completion from CompletingRebalance + group.Mu.Lock() rtm.ForceCompleteRebalance(group) - group.Mu.RLock() if group.State != GroupStateStable { t.Errorf("Expected group state to be Stable, got %s", group.State.String()) } @@ -206,7 +205,7 @@ func TestRebalanceTimeoutManager_ForceCompleteRebalance(t *testing.T) { if member.State != MemberStateStable { t.Errorf("Expected member state to be Stable, got %s", member.State.String()) } - group.Mu.RUnlock() + group.Mu.Unlock() } func TestRebalanceTimeoutManager_GetRebalanceStatus(t *testing.T) { diff --git a/weed/mq/kafka/protocol/fetch.go b/weed/mq/kafka/protocol/fetch.go index 58a96f5d8..a6d317e11 100644 --- a/weed/mq/kafka/protocol/fetch.go +++ b/weed/mq/kafka/protocol/fetch.go @@ -55,7 +55,10 @@ func (h *Handler) handleFetch(ctx context.Context, correlationID uint32, apiVers for _, partition := range topic.Partitions { hwm, err := h.seaweedMQHandler.GetLatestOffset(topic.Name, partition.PartitionID) if err != nil { - continue + // HWM lookup failed (e.g. partition deactivated between consumer + // sessions). Assume data may be available rather than blocking in + // the long-poll loop — the actual fetch will determine the truth. + return true } // Normalize fetch offset effectiveOffset := partition.FetchOffset diff --git a/weed/mq/kafka/protocol/fetch_partition_reader.go b/weed/mq/kafka/protocol/fetch_partition_reader.go index 6583c6489..33167921b 100644 --- a/weed/mq/kafka/protocol/fetch_partition_reader.go +++ b/weed/mq/kafka/protocol/fetch_partition_reader.go @@ -2,6 +2,7 @@ package protocol import ( "context" + "math" "sync" "time" @@ -136,13 +137,19 @@ func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partition }() // Get high water mark + hwmUnknown := false hwm, hwmErr := pr.handler.seaweedMQHandler.GetLatestOffset(pr.topicName, pr.partitionID) if hwmErr != nil { - glog.Errorf("[%s] CRITICAL: Failed to get HWM for %s[%d]: %v", + // HWM lookup can fail when the partition has been deactivated between consumer + // sessions. Proceed with the fetch anyway — the broker will return the correct + // data (or empty) based on its own state. Use math.MaxInt64 as sentinel so + // FetchMultipleBatches doesn't artificially cap recordsAvailable, and we + // don't hit the early-return below. The actual HWM will be derived from + // the fetch result (newOffset) after the read. + glog.Warningf("[%s] HWM lookup failed for %s[%d]: %v — will attempt fetch anyway", pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, hwmErr) - result.recordBatch = []byte{} - result.highWaterMark = 0 - return + hwm = math.MaxInt64 + hwmUnknown = true } result.highWaterMark = hwm @@ -170,10 +177,20 @@ func (pr *partitionReader) serveFetchRequest(ctx context.Context, req *partition // Fetch on-demand - no pre-fetching to avoid overwhelming the broker recordBatch, newOffset := pr.readRecords(ctx, req.requestedOffset, req.maxBytes, req.maxWaitMs, hwm) + // When HWM was unknown, derive a reasonable value from the fetch result + // so the client sees a meaningful high water mark instead of MaxInt64. + if hwmUnknown { + if newOffset > req.requestedOffset { + result.highWaterMark = newOffset // best estimate: end of what we read + } else { + result.highWaterMark = req.requestedOffset // no data found + } + } + // Log what we got back - DETAILED for diagnostics if len(recordBatch) == 0 { glog.V(2).Infof("[%s] FETCH %s[%d]: readRecords returned EMPTY (offset=%d, hwm=%d)", - pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, hwm) + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, req.requestedOffset, result.highWaterMark) result.recordBatch = []byte{} } else { result.recordBatch = recordBatch @@ -228,15 +245,40 @@ func (pr *partitionReader) readRecords(ctx context.Context, fromOffset int64, ma return fetchResult.RecordBatches, fetchResult.NextOffset } - // Multi-batch failed - try single batch WITHOUT the timeout constraint - // to ensure we get at least some data even if multi-batch timed out + // Multi-batch failed - try single batch with a fresh timeout glog.Warningf("[%s] Multi-batch fetch failed for %s[%d] offset=%d after %v, falling back to single-batch (err: %v)", pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, fromOffset, fetchDuration, err) - // Use original context for fallback, NOT the timed-out fetchCtx - // This ensures the fallback has a fresh chance to fetch data + // Compute the remaining time budget for the fallback. If the parent + // context carries a deadline, honour it; otherwise derive remaining + // from maxWaitMs minus elapsed time. This prevents the fallback from + // restarting the full budget after the multi-batch fetch already + // consumed part of it. + var remaining time.Duration + if deadline, ok := ctx.Deadline(); ok { + remaining = time.Until(deadline) + } else { + remaining = time.Duration(maxWaitMs)*time.Millisecond - time.Since(fetchStartTime) + } + if remaining <= 0 { + // Budget exhausted — skip the fallback entirely. + glog.V(2).Infof("[%s] No remaining budget for fallback on %s[%d] (maxWait=%dms, elapsed=%v)", + pr.connCtx.ConnectionID, pr.topicName, pr.partitionID, maxWaitMs, time.Since(fetchStartTime)) + return []byte{}, fromOffset + } + // Clamp: floor of 2s so disk reads via gRPC have a realistic chance, + // but never exceed 10s to bound data-plane blocking. + fallbackTimeout := remaining + if fallbackTimeout < 2*time.Second { + fallbackTimeout = 2 * time.Second + } + if fallbackTimeout > 10*time.Second { + fallbackTimeout = 10 * time.Second + } + fallbackCtx, fallbackCancel := context.WithTimeout(ctx, fallbackTimeout) + defer fallbackCancel() fallbackStartTime := time.Now() - smqRecords, err := pr.handler.seaweedMQHandler.GetStoredRecords(ctx, pr.topicName, pr.partitionID, fromOffset, 10) + smqRecords, err := pr.handler.seaweedMQHandler.GetStoredRecords(fallbackCtx, pr.topicName, pr.partitionID, fromOffset, 10) fallbackDuration := time.Since(fallbackStartTime) if fallbackDuration > 2*time.Second {