iceberg: keep split bins within target size (#8640)
This commit is contained in:
@@ -363,9 +363,9 @@ func buildCompactionBins(entries []iceberg.ManifestEntry, targetSize int64, minF
|
||||
}
|
||||
|
||||
// splitOversizedBin splits a bin whose total size exceeds targetSize into
|
||||
// sub-bins that each stay under targetSize while meeting minFiles.
|
||||
// Entries are sorted by size descending before splitting so that large
|
||||
// files are placed first, improving bin packing efficiency.
|
||||
// sub-bins that stay under targetSize. Bins that cannot reach minFiles
|
||||
// without violating targetSize are left uncompacted rather than merged into
|
||||
// oversized bins.
|
||||
func splitOversizedBin(bin compactionBin, targetSize int64, minFiles int) []compactionBin {
|
||||
// Sort largest-first for better packing.
|
||||
sorted := make([]iceberg.ManifestEntry, len(bin.Entries))
|
||||
@@ -394,41 +394,43 @@ func splitOversizedBin(bin compactionBin, targetSize int64, minFiles int) []comp
|
||||
bins = append(bins, current)
|
||||
}
|
||||
|
||||
// Merge any bin with fewer than minFiles entries into its neighbor.
|
||||
// Repeat until no more merges are needed. This handles runts at the
|
||||
// end (from the split) and small leading bins (from largest-first
|
||||
// sorting placing single large files into their own bins).
|
||||
for changed := true; changed && len(bins) > 1; {
|
||||
changed = false
|
||||
for i := 0; i < len(bins); i++ {
|
||||
if len(bins[i].Entries) >= minFiles {
|
||||
var valid []compactionBin
|
||||
var pending []compactionBin
|
||||
for _, candidate := range bins {
|
||||
if len(candidate.Entries) >= minFiles {
|
||||
valid = append(valid, candidate)
|
||||
continue
|
||||
}
|
||||
// Pick the better neighbor to merge into.
|
||||
var target int
|
||||
if i == 0 {
|
||||
target = 1
|
||||
} else if i == len(bins)-1 {
|
||||
target = i - 1
|
||||
} else if bins[i-1].TotalSize <= bins[i+1].TotalSize {
|
||||
target = i - 1
|
||||
} else {
|
||||
target = i + 1
|
||||
pending = append(pending, candidate)
|
||||
}
|
||||
|
||||
// Try to fold entries from underfilled bins into valid bins when they fit.
|
||||
for _, runt := range pending {
|
||||
for _, entry := range runt.Entries {
|
||||
bestIdx := -1
|
||||
bestRemaining := int64(-1)
|
||||
entrySize := entry.DataFile().FileSizeBytes()
|
||||
for i := range valid {
|
||||
remaining := targetSize - valid[i].TotalSize - entrySize
|
||||
if remaining < 0 {
|
||||
continue
|
||||
}
|
||||
if bestIdx == -1 || remaining < bestRemaining {
|
||||
bestIdx = i
|
||||
bestRemaining = remaining
|
||||
}
|
||||
}
|
||||
if bestIdx >= 0 {
|
||||
valid[bestIdx].Entries = append(valid[bestIdx].Entries, entry)
|
||||
valid[bestIdx].TotalSize += entrySize
|
||||
}
|
||||
bins[target].Entries = append(bins[target].Entries, bins[i].Entries...)
|
||||
bins[target].TotalSize += bins[i].TotalSize
|
||||
bins = append(bins[:i], bins[i+1:]...)
|
||||
changed = true
|
||||
break // restart scan after structural change
|
||||
}
|
||||
}
|
||||
|
||||
// Final guard: if a single remaining bin has fewer than minFiles
|
||||
// (entire input too small), return nothing.
|
||||
if len(bins) == 1 && len(bins[0].Entries) < minFiles {
|
||||
if len(valid) == 0 {
|
||||
return nil
|
||||
}
|
||||
return bins
|
||||
return valid
|
||||
}
|
||||
|
||||
// partitionKey creates a string key from a partition map for grouping.
|
||||
|
||||
@@ -515,6 +515,59 @@ func TestBuildCompactionBinsMultiplePartitions(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSplitOversizedBinRespectsTargetSize(t *testing.T) {
|
||||
targetSize := int64(100)
|
||||
minFiles := 2
|
||||
|
||||
entries := makeTestEntries(t, []testEntrySpec{
|
||||
{path: "data/f1.parquet", size: 80, partition: map[int]any{}},
|
||||
{path: "data/f2.parquet", size: 80, partition: map[int]any{}},
|
||||
{path: "data/f3.parquet", size: 10, partition: map[int]any{}},
|
||||
{path: "data/f4.parquet", size: 10, partition: map[int]any{}},
|
||||
})
|
||||
|
||||
bins := splitOversizedBin(compactionBin{
|
||||
PartitionKey: "__unpartitioned__",
|
||||
Partition: map[int]any{},
|
||||
Entries: entries,
|
||||
TotalSize: 180,
|
||||
}, targetSize, minFiles)
|
||||
|
||||
if len(bins) == 0 {
|
||||
t.Fatal("expected at least one valid bin")
|
||||
}
|
||||
for i, bin := range bins {
|
||||
if bin.TotalSize > targetSize {
|
||||
t.Fatalf("bin %d exceeds target size: got %d want <= %d", i, bin.TotalSize, targetSize)
|
||||
}
|
||||
if len(bin.Entries) < minFiles {
|
||||
t.Fatalf("bin %d does not meet minFiles: got %d want >= %d", i, len(bin.Entries), minFiles)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSplitOversizedBinDropsImpossibleRunts(t *testing.T) {
|
||||
targetSize := int64(100)
|
||||
minFiles := 2
|
||||
|
||||
entries := makeTestEntries(t, []testEntrySpec{
|
||||
{path: "data/f1.parquet", size: 60, partition: map[int]any{}},
|
||||
{path: "data/f2.parquet", size: 60, partition: map[int]any{}},
|
||||
{path: "data/f3.parquet", size: 60, partition: map[int]any{}},
|
||||
})
|
||||
|
||||
bins := splitOversizedBin(compactionBin{
|
||||
PartitionKey: "__unpartitioned__",
|
||||
Partition: map[int]any{},
|
||||
Entries: entries,
|
||||
TotalSize: 180,
|
||||
}, targetSize, minFiles)
|
||||
|
||||
if len(bins) != 0 {
|
||||
t.Fatalf("expected no valid bins, got %d", len(bins))
|
||||
}
|
||||
}
|
||||
|
||||
type testEntrySpec struct {
|
||||
path string
|
||||
size int64
|
||||
|
||||
Reference in New Issue
Block a user