fix: skip exhausted blocks before creating an interval (#8180)

* fix: skip exhausted blocks before creating an interval

* refactor: optimize interval creation and fix logic duplication

* docs: add docstring for LocateData

* refactor: extract moveToNextBlock helper to deduplicate logic

* fix: use int64 for block index comparison to prevent overflow

* test: add unit test for LocateData boundary crossing (issue #8179)

* fix: skip exhausted blocks to prevent negative interval size and panics (issue #8179)

* refactor: apply review suggestions for test maintainability and code style
This commit is contained in:
Chris Lu
2026-02-02 11:12:31 -08:00
committed by GitHub
parent 621834d96a
commit f23e09f58b
2 changed files with 41 additions and 10 deletions

View File

@@ -12,10 +12,23 @@ type Interval struct {
LargeBlockRowsCount int
}
// LocateData finds the intervals of data within erasure coding blocks for a given offset and size.
func LocateData(largeBlockLength, smallBlockLength int64, shardDatSize int64, offset int64, size types.Size) (intervals []Interval) {
blockIndex, isLargeBlock, nLargeBlockRows, innerBlockOffset := locateOffset(largeBlockLength, smallBlockLength, shardDatSize, offset)
for size > 0 {
blockRemaining := largeBlockLength - innerBlockOffset
if !isLargeBlock {
blockRemaining = smallBlockLength - innerBlockOffset
}
if blockRemaining <= 0 {
// move to next block
blockIndex, isLargeBlock = moveToNextBlock(blockIndex, isLargeBlock, nLargeBlockRows)
innerBlockOffset = 0
continue
}
interval := Interval{
BlockIndex: blockIndex,
InnerBlockOffset: innerBlockOffset,
@@ -23,11 +36,6 @@ func LocateData(largeBlockLength, smallBlockLength int64, shardDatSize int64, of
LargeBlockRowsCount: int(nLargeBlockRows),
}
blockRemaining := largeBlockLength - innerBlockOffset
if !isLargeBlock {
blockRemaining = smallBlockLength - innerBlockOffset
}
if int64(size) <= blockRemaining {
interval.Size = size
intervals = append(intervals, interval)
@@ -37,17 +45,23 @@ func LocateData(largeBlockLength, smallBlockLength int64, shardDatSize int64, of
intervals = append(intervals, interval)
size -= interval.Size
blockIndex += 1
if isLargeBlock && blockIndex == interval.LargeBlockRowsCount*DataShardsCount {
isLargeBlock = false
blockIndex = 0
}
blockIndex, isLargeBlock = moveToNextBlock(blockIndex, isLargeBlock, nLargeBlockRows)
innerBlockOffset = 0
}
return
}
func moveToNextBlock(blockIndex int, isLargeBlock bool, nLargeBlockRows int64) (int, bool) {
nextBlockIndex := blockIndex + 1
nextIsLargeBlock := isLargeBlock
if isLargeBlock && int64(nextBlockIndex) == nLargeBlockRows*DataShardsCount {
nextIsLargeBlock = false
nextBlockIndex = 0
}
return nextBlockIndex, nextIsLargeBlock
}
func locateOffset(largeBlockLength, smallBlockLength int64, shardDatSize int64, offset int64) (blockIndex int, isLargeBlock bool, nLargeBlockRows int64, innerBlockOffset int64) {
largeRowSize := largeBlockLength * DataShardsCount
nLargeBlockRows = (shardDatSize - 1) / largeBlockLength