From 5f19e3259f81e9242c509a50117ed5c450ed6d16 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 15 Mar 2026 12:48:31 -0700 Subject: [PATCH] iceberg: keep split bins within target size (#8640) --- weed/plugin/worker/iceberg/compact.go | 64 +++++++++++----------- weed/plugin/worker/iceberg/handler_test.go | 53 ++++++++++++++++++ 2 files changed, 86 insertions(+), 31 deletions(-) diff --git a/weed/plugin/worker/iceberg/compact.go b/weed/plugin/worker/iceberg/compact.go index 62fa42ecb..40d9092eb 100644 --- a/weed/plugin/worker/iceberg/compact.go +++ b/weed/plugin/worker/iceberg/compact.go @@ -363,9 +363,9 @@ func buildCompactionBins(entries []iceberg.ManifestEntry, targetSize int64, minF } // splitOversizedBin splits a bin whose total size exceeds targetSize into -// sub-bins that each stay under targetSize while meeting minFiles. -// Entries are sorted by size descending before splitting so that large -// files are placed first, improving bin packing efficiency. +// sub-bins that stay under targetSize. Bins that cannot reach minFiles +// without violating targetSize are left uncompacted rather than merged into +// oversized bins. func splitOversizedBin(bin compactionBin, targetSize int64, minFiles int) []compactionBin { // Sort largest-first for better packing. sorted := make([]iceberg.ManifestEntry, len(bin.Entries)) @@ -394,41 +394,43 @@ func splitOversizedBin(bin compactionBin, targetSize int64, minFiles int) []comp bins = append(bins, current) } - // Merge any bin with fewer than minFiles entries into its neighbor. - // Repeat until no more merges are needed. This handles runts at the - // end (from the split) and small leading bins (from largest-first - // sorting placing single large files into their own bins). - for changed := true; changed && len(bins) > 1; { - changed = false - for i := 0; i < len(bins); i++ { - if len(bins[i].Entries) >= minFiles { - continue + var valid []compactionBin + var pending []compactionBin + for _, candidate := range bins { + if len(candidate.Entries) >= minFiles { + valid = append(valid, candidate) + continue + } + pending = append(pending, candidate) + } + + // Try to fold entries from underfilled bins into valid bins when they fit. + for _, runt := range pending { + for _, entry := range runt.Entries { + bestIdx := -1 + bestRemaining := int64(-1) + entrySize := entry.DataFile().FileSizeBytes() + for i := range valid { + remaining := targetSize - valid[i].TotalSize - entrySize + if remaining < 0 { + continue + } + if bestIdx == -1 || remaining < bestRemaining { + bestIdx = i + bestRemaining = remaining + } } - // Pick the better neighbor to merge into. - var target int - if i == 0 { - target = 1 - } else if i == len(bins)-1 { - target = i - 1 - } else if bins[i-1].TotalSize <= bins[i+1].TotalSize { - target = i - 1 - } else { - target = i + 1 + if bestIdx >= 0 { + valid[bestIdx].Entries = append(valid[bestIdx].Entries, entry) + valid[bestIdx].TotalSize += entrySize } - bins[target].Entries = append(bins[target].Entries, bins[i].Entries...) - bins[target].TotalSize += bins[i].TotalSize - bins = append(bins[:i], bins[i+1:]...) - changed = true - break // restart scan after structural change } } - // Final guard: if a single remaining bin has fewer than minFiles - // (entire input too small), return nothing. - if len(bins) == 1 && len(bins[0].Entries) < minFiles { + if len(valid) == 0 { return nil } - return bins + return valid } // partitionKey creates a string key from a partition map for grouping. diff --git a/weed/plugin/worker/iceberg/handler_test.go b/weed/plugin/worker/iceberg/handler_test.go index 7db6bb0dd..db7f5aa07 100644 --- a/weed/plugin/worker/iceberg/handler_test.go +++ b/weed/plugin/worker/iceberg/handler_test.go @@ -515,6 +515,59 @@ func TestBuildCompactionBinsMultiplePartitions(t *testing.T) { } } +func TestSplitOversizedBinRespectsTargetSize(t *testing.T) { + targetSize := int64(100) + minFiles := 2 + + entries := makeTestEntries(t, []testEntrySpec{ + {path: "data/f1.parquet", size: 80, partition: map[int]any{}}, + {path: "data/f2.parquet", size: 80, partition: map[int]any{}}, + {path: "data/f3.parquet", size: 10, partition: map[int]any{}}, + {path: "data/f4.parquet", size: 10, partition: map[int]any{}}, + }) + + bins := splitOversizedBin(compactionBin{ + PartitionKey: "__unpartitioned__", + Partition: map[int]any{}, + Entries: entries, + TotalSize: 180, + }, targetSize, minFiles) + + if len(bins) == 0 { + t.Fatal("expected at least one valid bin") + } + for i, bin := range bins { + if bin.TotalSize > targetSize { + t.Fatalf("bin %d exceeds target size: got %d want <= %d", i, bin.TotalSize, targetSize) + } + if len(bin.Entries) < minFiles { + t.Fatalf("bin %d does not meet minFiles: got %d want >= %d", i, len(bin.Entries), minFiles) + } + } +} + +func TestSplitOversizedBinDropsImpossibleRunts(t *testing.T) { + targetSize := int64(100) + minFiles := 2 + + entries := makeTestEntries(t, []testEntrySpec{ + {path: "data/f1.parquet", size: 60, partition: map[int]any{}}, + {path: "data/f2.parquet", size: 60, partition: map[int]any{}}, + {path: "data/f3.parquet", size: 60, partition: map[int]any{}}, + }) + + bins := splitOversizedBin(compactionBin{ + PartitionKey: "__unpartitioned__", + Partition: map[int]any{}, + Entries: entries, + TotalSize: 180, + }, targetSize, minFiles) + + if len(bins) != 0 { + t.Fatalf("expected no valid bins, got %d", len(bins)) + } +} + type testEntrySpec struct { path string size int64