Files
seaweedFS/weed/plugin/worker/iceberg/compact.go
Chris Lu a00eddb525 Iceberg table maintenance Phase 3: multi-spec compaction, delete handling, and metrics (#8643)
* Add multi-partition-spec compaction and delete-aware compaction (Phase 3)

Multi-partition-spec compaction:
- Add SpecID to compactionBin struct and group by spec+partition key
- Remove the len(specIDs) > 1 skip that blocked spec-evolved tables
- Write per-spec manifests in compaction commit using specByID map
- Use per-bin PartitionSpec when calling NewDataFileBuilder

Delete-aware compaction:
- Add ApplyDeletes config (default: true) with readBoolConfig helper
- Implement position delete collection (file_path + pos Parquet columns)
- Implement equality delete collection (field ID to column mapping)
- Update mergeParquetFiles to filter rows via position deletes (binary
  search) and equality deletes (hash set lookup)
- Smart delete manifest carry-forward: drop when all data files compacted
- Fix EXISTING/DELETED entries to include sequence numbers

Tests for multi-spec bins, delete collection, merge filtering, and
end-to-end compaction with position/equality/mixed deletes.

* Add structured metrics and per-bin progress to iceberg maintenance

- Change return type of all four operations from (string, error) to
  (string, map[string]int64, error) with structured metric counts
  (files_merged, snapshots_expired, orphans_removed, duration_ms, etc.)
- Add onProgress callback to compactDataFiles for per-bin progress
- In Execute, pass progress callback that sends JobProgressUpdate with
  per-bin stage messages
- Accumulate per-operation metrics with dot-prefixed keys
  (e.g. compact.files_merged) into OutputValues on completion
- Update testing_api.go wrappers and integration test call sites
- Add tests: TestCompactDataFilesMetrics, TestExpireSnapshotsMetrics,
  TestExecuteCompletionOutputValues

* Address review feedback: group equality deletes by field IDs, use metric constants

- Group equality deletes by distinct equality_ids sets so different
  delete files with different equality columns are handled correctly
- Use length-prefixed type-aware encoding in buildEqualityKey to avoid
  ambiguity between types and collisions from null bytes
- Extract metric key strings into package-level constants

* Fix buildEqualityKey to use length-prefixed type-aware encoding

The previous implementation used plain String() concatenation with null
byte separators, which caused type ambiguity (int 123 vs string "123")
and separator collisions when values contain null bytes. Now each value
is serialized as "kind:length:value" for unambiguous composite keys.

This fix was missed in the prior cherry-pick due to a merge conflict.

* Address nitpick review comments

- Document patchManifestContentToDeletes workaround: explain that
  iceberg-go WriteManifest cannot create delete manifests, and note
  the fail-fast validation on pattern match
- Document makeTestEntries: note that specID field is ignored and
  callers should use makeTestEntriesWithSpec for multi-spec testing

* fmt

* Fix path normalization, manifest threshold, and artifact filename collisions

- Normalize file paths in position delete collection and lookup so that
  absolute S3 URLs and relative paths match correctly
- Fix rewriteManifests threshold check to count only data manifests
  (was including delete manifests in the count and metric)
- Add random suffix to artifact filenames in compactDataFiles and
  rewriteManifests to prevent collisions between concurrent runs
- Sort compaction bins by SpecID then PartitionKey for deterministic
  ordering across specs

* Fix pos delete read, deduplicate column resolution, minor cleanups

- Remove broken Column() guard in position delete reading that silently
  defaulted pos to 0; unconditionally extract Int64() instead
- Deduplicate column resolution in readEqualityDeleteFile by calling
  resolveEqualityColIndices instead of inlining the same logic
- Add warning log in readBoolConfig for unrecognized string values
- Fix CompactDataFiles call site in integration test to capture 3 return
  values

* Advance progress on all bins, deterministic manifest order, assert metrics

- Call onProgress for every bin iteration including skipped/failed bins
  so progress reporting never appears stalled
- Sort spec IDs before iterating specEntriesMap to produce deterministic
  manifest list ordering across runs
- Assert expected metric keys in CompactDataFiles integration test

---------

Co-authored-by: Copilot <copilot@github.com>
2026-03-15 19:18:14 -07:00

1087 lines
34 KiB
Go

package iceberg
import (
"bytes"
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"path"
"sort"
"strings"
"time"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/table"
"github.com/parquet-go/parquet-go"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3tables"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// compactionBin groups small data files from the same partition and spec for merging.
type compactionBin struct {
PartitionKey string
Partition map[int]any
SpecID int32
Entries []iceberg.ManifestEntry
TotalSize int64
}
// compactDataFiles reads manifests to find small Parquet data files, groups
// them by partition, reads and merges them using parquet-go, and commits new
// manifest entries.
func (h *Handler) compactDataFiles(
ctx context.Context,
filerClient filer_pb.SeaweedFilerClient,
bucketName, tablePath string,
config Config,
onProgress func(binIdx, totalBins int),
) (string, map[string]int64, error) {
start := time.Now()
meta, metadataFileName, err := loadCurrentMetadata(ctx, filerClient, bucketName, tablePath)
if err != nil {
return "", nil, fmt.Errorf("load metadata: %w", err)
}
currentSnap := meta.CurrentSnapshot()
if currentSnap == nil || currentSnap.ManifestList == "" {
return "no current snapshot", nil, nil
}
// Read manifest list
manifestListData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, currentSnap.ManifestList)
if err != nil {
return "", nil, fmt.Errorf("read manifest list: %w", err)
}
manifests, err := iceberg.ReadManifestList(bytes.NewReader(manifestListData))
if err != nil {
return "", nil, fmt.Errorf("parse manifest list: %w", err)
}
// Separate data manifests from delete manifests.
var dataManifests, deleteManifests []iceberg.ManifestFile
for _, mf := range manifests {
if mf.ManifestContent() == iceberg.ManifestContentData {
dataManifests = append(dataManifests, mf)
} else {
deleteManifests = append(deleteManifests, mf)
}
}
// If delete manifests exist and apply_deletes is disabled (or not yet
// implemented for this code path), skip compaction to avoid producing
// incorrect results by dropping deletes.
if len(deleteManifests) > 0 && !config.ApplyDeletes {
return "compaction skipped: delete manifests present and apply_deletes is disabled", nil, nil
}
// Collect data file entries from data manifests
var allEntries []iceberg.ManifestEntry
for _, mf := range dataManifests {
manifestData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, mf.FilePath())
if err != nil {
return "", nil, fmt.Errorf("read manifest %s: %w", mf.FilePath(), err)
}
entries, err := iceberg.ReadManifest(mf, bytes.NewReader(manifestData), true)
if err != nil {
return "", nil, fmt.Errorf("parse manifest %s: %w", mf.FilePath(), err)
}
allEntries = append(allEntries, entries...)
}
// Collect delete entries if we need to apply deletes
var positionDeletes map[string][]int64
var eqDeleteGroups []equalityDeleteGroup
if config.ApplyDeletes && len(deleteManifests) > 0 {
var allDeleteEntries []iceberg.ManifestEntry
for _, mf := range deleteManifests {
manifestData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, mf.FilePath())
if err != nil {
return "", nil, fmt.Errorf("read delete manifest %s: %w", mf.FilePath(), err)
}
entries, err := iceberg.ReadManifest(mf, bytes.NewReader(manifestData), true)
if err != nil {
return "", nil, fmt.Errorf("parse delete manifest %s: %w", mf.FilePath(), err)
}
allDeleteEntries = append(allDeleteEntries, entries...)
}
// Separate position and equality deletes
var posDeleteEntries, eqDeleteEntries []iceberg.ManifestEntry
for _, entry := range allDeleteEntries {
switch entry.DataFile().ContentType() {
case iceberg.EntryContentPosDeletes:
posDeleteEntries = append(posDeleteEntries, entry)
case iceberg.EntryContentEqDeletes:
eqDeleteEntries = append(eqDeleteEntries, entry)
}
}
if len(posDeleteEntries) > 0 {
positionDeletes, err = collectPositionDeletes(ctx, filerClient, bucketName, tablePath, posDeleteEntries)
if err != nil {
return "", nil, fmt.Errorf("collect position deletes: %w", err)
}
}
if len(eqDeleteEntries) > 0 {
eqDeleteGroups, err = collectEqualityDeletes(ctx, filerClient, bucketName, tablePath, eqDeleteEntries, meta.CurrentSchema())
if err != nil {
return "", nil, fmt.Errorf("collect equality deletes: %w", err)
}
}
}
// Build compaction bins: group small files by partition
// MinInputFiles is clamped by ParseConfig to [2, ...] so int conversion is safe.
bins := buildCompactionBins(allEntries, config.TargetFileSizeBytes, int(config.MinInputFiles))
if len(bins) == 0 {
return "no files eligible for compaction", nil, nil
}
// Build a lookup from spec ID to PartitionSpec for per-bin manifest writing.
specByID := make(map[int]iceberg.PartitionSpec)
for _, ps := range meta.PartitionSpecs() {
specByID[ps.ID()] = ps
}
schema := meta.CurrentSchema()
version := meta.Version()
snapshotID := currentSnap.SnapshotID
// Compute the snapshot ID for the commit up front so all manifest entries
// reference the same snapshot that will actually be committed.
newSnapID := time.Now().UnixMilli()
// Random suffix for artifact filenames to avoid collisions between
// concurrent compaction runs on different tables sharing a timestamp.
artifactSuffix := compactRandomSuffix()
// Process each bin: read source Parquet files, merge, write output
var newManifestEntries []iceberg.ManifestEntry
var deletedManifestEntries []iceberg.ManifestEntry
totalMerged := 0
entrySeqNum := func(entry iceberg.ManifestEntry) *int64 {
seqNum := entry.SequenceNum()
if seqNum < 0 {
return nil
}
return &seqNum
}
entryFileSeqNum := func(entry iceberg.ManifestEntry) *int64 {
if fileSeqNum := entry.FileSequenceNum(); fileSeqNum != nil {
value := *fileSeqNum
return &value
}
return entrySeqNum(entry)
}
metaDir := path.Join(s3tables.TablesPath, bucketName, tablePath, "metadata")
dataDir := path.Join(s3tables.TablesPath, bucketName, tablePath, "data")
// Track written artifacts so we can clean them up if the commit fails.
type artifact struct {
dir, fileName string
}
var writtenArtifacts []artifact
committed := false
defer func() {
if committed || len(writtenArtifacts) == 0 {
return
}
// Use a detached context so cleanup completes even if ctx was canceled.
cleanupCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
for _, a := range writtenArtifacts {
if err := deleteFilerFile(cleanupCtx, filerClient, a.dir, a.fileName); err != nil {
glog.Warningf("iceberg compact: failed to clean up artifact %s/%s: %v", a.dir, a.fileName, err)
}
}
}()
for binIdx, bin := range bins {
select {
case <-ctx.Done():
return "", nil, ctx.Err()
default:
}
mergedFileName := fmt.Sprintf("compact-%d-%d-%s-%d.parquet", snapshotID, newSnapID, artifactSuffix, binIdx)
mergedFilePath := path.Join("data", mergedFileName)
mergedData, recordCount, err := mergeParquetFiles(ctx, filerClient, bucketName, tablePath, bin.Entries, positionDeletes, eqDeleteGroups, schema)
if err != nil {
glog.Warningf("iceberg compact: failed to merge bin %d (%d files): %v", binIdx, len(bin.Entries), err)
goto binDone
}
// Write merged file to filer
if err := ensureFilerDir(ctx, filerClient, dataDir); err != nil {
return "", nil, fmt.Errorf("ensure data dir: %w", err)
}
if err := saveFilerFile(ctx, filerClient, dataDir, mergedFileName, mergedData); err != nil {
return "", nil, fmt.Errorf("save merged file: %w", err)
}
// Use the partition spec matching this bin's spec ID
{
binSpec, ok := specByID[int(bin.SpecID)]
if !ok {
glog.Warningf("iceberg compact: spec %d not found for bin %d, skipping", bin.SpecID, binIdx)
_ = deleteFilerFile(ctx, filerClient, dataDir, mergedFileName)
goto binDone
}
// Create new DataFile entry for the merged file
dfBuilder, err := iceberg.NewDataFileBuilder(
binSpec,
iceberg.EntryContentData,
mergedFilePath,
iceberg.ParquetFile,
bin.Partition,
nil, nil,
recordCount,
int64(len(mergedData)),
)
if err != nil {
glog.Warningf("iceberg compact: failed to build data file entry for bin %d: %v", binIdx, err)
_ = deleteFilerFile(ctx, filerClient, dataDir, mergedFileName)
goto binDone
}
writtenArtifacts = append(writtenArtifacts, artifact{dir: dataDir, fileName: mergedFileName})
newEntry := iceberg.NewManifestEntry(
iceberg.EntryStatusADDED,
&newSnapID,
nil, nil,
dfBuilder.Build(),
)
newManifestEntries = append(newManifestEntries, newEntry)
// Mark original entries as deleted
for _, entry := range bin.Entries {
delEntry := iceberg.NewManifestEntry(
iceberg.EntryStatusDELETED,
&newSnapID,
entrySeqNum(entry), entryFileSeqNum(entry),
entry.DataFile(),
)
deletedManifestEntries = append(deletedManifestEntries, delEntry)
}
totalMerged += len(bin.Entries)
}
binDone:
if onProgress != nil {
onProgress(binIdx, len(bins))
}
}
if len(newManifestEntries) == 0 {
return "no bins successfully compacted", nil, nil
}
// Build entries for the new manifests:
// - ADDED entries for merged files
// - DELETED entries for original files
// - EXISTING entries for files that weren't compacted
compactedPaths := make(map[string]struct{})
for _, entry := range deletedManifestEntries {
compactedPaths[entry.DataFile().FilePath()] = struct{}{}
}
// Group all manifest entries by spec ID for per-spec manifest writing.
type specEntries struct {
specID int32
entries []iceberg.ManifestEntry
}
specEntriesMap := make(map[int32]*specEntries)
addToSpec := func(specID int32, entry iceberg.ManifestEntry) {
se, ok := specEntriesMap[specID]
if !ok {
se = &specEntries{specID: specID}
specEntriesMap[specID] = se
}
se.entries = append(se.entries, entry)
}
// New and deleted entries carry the spec ID from their bin
for _, entry := range newManifestEntries {
addToSpec(entry.DataFile().SpecID(), entry)
}
for _, entry := range deletedManifestEntries {
addToSpec(entry.DataFile().SpecID(), entry)
}
// Existing entries that weren't compacted
for _, entry := range allEntries {
if _, compacted := compactedPaths[entry.DataFile().FilePath()]; !compacted {
existingEntry := iceberg.NewManifestEntry(
iceberg.EntryStatusEXISTING,
func() *int64 { id := entry.SnapshotID(); return &id }(),
entrySeqNum(entry), entryFileSeqNum(entry),
entry.DataFile(),
)
addToSpec(entry.DataFile().SpecID(), existingEntry)
}
}
// Write one manifest per spec ID, iterating in sorted order for
// deterministic manifest list construction.
sortedSpecIDs := make([]int32, 0, len(specEntriesMap))
for sid := range specEntriesMap {
sortedSpecIDs = append(sortedSpecIDs, sid)
}
sort.Slice(sortedSpecIDs, func(i, j int) bool { return sortedSpecIDs[i] < sortedSpecIDs[j] })
var allManifests []iceberg.ManifestFile
for _, sid := range sortedSpecIDs {
se := specEntriesMap[sid]
ps, ok := specByID[int(se.specID)]
if !ok {
return "", nil, fmt.Errorf("partition spec %d not found in table metadata", se.specID)
}
var manifestBuf bytes.Buffer
manifestFileName := fmt.Sprintf("compact-%d-%s-spec%d.avro", newSnapID, artifactSuffix, se.specID)
newManifest, err := iceberg.WriteManifest(
path.Join("metadata", manifestFileName),
&manifestBuf,
version,
ps,
schema,
newSnapID,
se.entries,
)
if err != nil {
return "", nil, fmt.Errorf("write compact manifest for spec %d: %w", se.specID, err)
}
if err := saveFilerFile(ctx, filerClient, metaDir, manifestFileName, manifestBuf.Bytes()); err != nil {
return "", nil, fmt.Errorf("save compact manifest for spec %d: %w", se.specID, err)
}
writtenArtifacts = append(writtenArtifacts, artifact{dir: metaDir, fileName: manifestFileName})
allManifests = append(allManifests, newManifest)
}
// Carry forward delete manifests only if deletes were NOT applied.
// When deletes were applied, they've been consumed during the merge.
// Position deletes reference specific data files — if all those files
// were compacted, the deletes are fully consumed. Equality deletes
// apply broadly, so they're only consumed if all data files were compacted.
if !config.ApplyDeletes || (len(positionDeletes) == 0 && len(eqDeleteGroups) == 0) {
for _, mf := range deleteManifests {
allManifests = append(allManifests, mf)
}
} else {
// Check if any non-compacted data files remain
hasUncompactedFiles := false
for _, entry := range allEntries {
if _, compacted := compactedPaths[entry.DataFile().FilePath()]; !compacted {
hasUncompactedFiles = true
break
}
}
if hasUncompactedFiles {
// Some files weren't compacted — carry forward delete manifests
// since deletes may still apply to those files.
for _, mf := range deleteManifests {
allManifests = append(allManifests, mf)
}
}
// If all files were compacted, deletes are fully consumed — don't carry forward.
}
// Write new manifest list
var manifestListBuf bytes.Buffer
seqNum := currentSnap.SequenceNumber + 1
err = iceberg.WriteManifestList(version, &manifestListBuf, newSnapID, &snapshotID, &seqNum, 0, allManifests)
if err != nil {
return "", nil, fmt.Errorf("write compact manifest list: %w", err)
}
manifestListFileName := fmt.Sprintf("snap-%d-%s.avro", newSnapID, artifactSuffix)
if err := saveFilerFile(ctx, filerClient, metaDir, manifestListFileName, manifestListBuf.Bytes()); err != nil {
return "", nil, fmt.Errorf("save compact manifest list: %w", err)
}
writtenArtifacts = append(writtenArtifacts, artifact{dir: metaDir, fileName: manifestListFileName})
// Commit: add new snapshot and update main branch ref
manifestListLocation := path.Join("metadata", manifestListFileName)
err = h.commitWithRetry(ctx, filerClient, bucketName, tablePath, metadataFileName, config, func(currentMeta table.Metadata, builder *table.MetadataBuilder) error {
// Guard: verify table head hasn't advanced since we planned.
cs := currentMeta.CurrentSnapshot()
if cs == nil || cs.SnapshotID != snapshotID {
return errStalePlan
}
newSnapshot := &table.Snapshot{
SnapshotID: newSnapID,
ParentSnapshotID: &snapshotID,
SequenceNumber: seqNum,
TimestampMs: newSnapID,
ManifestList: manifestListLocation,
Summary: &table.Summary{
Operation: table.OpReplace,
Properties: map[string]string{
"maintenance": "compact_data_files",
"merged-files": fmt.Sprintf("%d", totalMerged),
"new-files": fmt.Sprintf("%d", len(newManifestEntries)),
"compaction-bins": fmt.Sprintf("%d", len(bins)),
},
},
SchemaID: func() *int {
id := schema.ID
return &id
}(),
}
if err := builder.AddSnapshot(newSnapshot); err != nil {
return err
}
return builder.SetSnapshotRef(table.MainBranch, newSnapID, table.BranchRef)
})
if err != nil {
return "", nil, fmt.Errorf("commit compaction: %w", err)
}
committed = true
metrics := map[string]int64{
MetricFilesMerged: int64(totalMerged),
MetricFilesWritten: int64(len(newManifestEntries)),
MetricBins: int64(len(bins)),
MetricDurationMs: time.Since(start).Milliseconds(),
}
return fmt.Sprintf("compacted %d files into %d (across %d bins)", totalMerged, len(newManifestEntries), len(bins)), metrics, nil
}
// buildCompactionBins groups small data files by partition for bin-packing.
// A file is "small" if it's below targetSize. A bin must have at least
// minFiles entries to be worth compacting.
func buildCompactionBins(entries []iceberg.ManifestEntry, targetSize int64, minFiles int) []compactionBin {
if minFiles < 2 {
minFiles = 2
}
// Group entries by spec ID + partition key so that files from different
// partition specs are never mixed in the same compaction bin.
groups := make(map[string]*compactionBin)
for _, entry := range entries {
df := entry.DataFile()
if df.FileFormat() != iceberg.ParquetFile {
continue
}
if df.FileSizeBytes() >= targetSize {
continue
}
partKey := partitionKey(df.Partition())
groupKey := fmt.Sprintf("spec%d\x00%s", df.SpecID(), partKey)
bin, ok := groups[groupKey]
if !ok {
bin = &compactionBin{
PartitionKey: partKey,
Partition: df.Partition(),
SpecID: df.SpecID(),
}
groups[groupKey] = bin
}
bin.Entries = append(bin.Entries, entry)
bin.TotalSize += df.FileSizeBytes()
}
// Filter to bins with enough files, splitting oversized bins
var result []compactionBin
for _, bin := range groups {
if len(bin.Entries) < minFiles {
continue
}
if bin.TotalSize <= targetSize {
result = append(result, *bin)
} else {
result = append(result, splitOversizedBin(*bin, targetSize, minFiles)...)
}
}
// Sort by spec ID then partition key for deterministic order
sort.Slice(result, func(i, j int) bool {
if result[i].SpecID != result[j].SpecID {
return result[i].SpecID < result[j].SpecID
}
return result[i].PartitionKey < result[j].PartitionKey
})
return result
}
// splitOversizedBin splits a bin whose total size exceeds targetSize into
// sub-bins that stay under targetSize. Bins that cannot reach minFiles
// without violating targetSize are left uncompacted rather than merged into
// oversized bins.
func splitOversizedBin(bin compactionBin, targetSize int64, minFiles int) []compactionBin {
// Sort largest-first for better packing.
sorted := make([]iceberg.ManifestEntry, len(bin.Entries))
copy(sorted, bin.Entries)
sort.Slice(sorted, func(i, j int) bool {
return sorted[i].DataFile().FileSizeBytes() > sorted[j].DataFile().FileSizeBytes()
})
var bins []compactionBin
current := compactionBin{
PartitionKey: bin.PartitionKey,
Partition: bin.Partition,
SpecID: bin.SpecID,
}
for _, entry := range sorted {
if current.TotalSize > 0 && current.TotalSize+entry.DataFile().FileSizeBytes() > targetSize {
bins = append(bins, current)
current = compactionBin{
PartitionKey: bin.PartitionKey,
Partition: bin.Partition,
SpecID: bin.SpecID,
}
}
current.Entries = append(current.Entries, entry)
current.TotalSize += entry.DataFile().FileSizeBytes()
}
if len(current.Entries) > 0 {
bins = append(bins, current)
}
var valid []compactionBin
var pending []compactionBin
for _, candidate := range bins {
if len(candidate.Entries) >= minFiles {
valid = append(valid, candidate)
continue
}
pending = append(pending, candidate)
}
// Try to fold entries from underfilled bins into valid bins when they fit.
for _, runt := range pending {
for _, entry := range runt.Entries {
bestIdx := -1
bestRemaining := int64(-1)
entrySize := entry.DataFile().FileSizeBytes()
for i := range valid {
remaining := targetSize - valid[i].TotalSize - entrySize
if remaining < 0 {
continue
}
if bestIdx == -1 || remaining < bestRemaining {
bestIdx = i
bestRemaining = remaining
}
}
if bestIdx >= 0 {
valid[bestIdx].Entries = append(valid[bestIdx].Entries, entry)
valid[bestIdx].TotalSize += entrySize
}
}
}
if len(valid) == 0 {
return nil
}
return valid
}
// partitionKey creates a string key from a partition map for grouping.
// Values are JSON-encoded to avoid ambiguity when values contain commas or '='.
func partitionKey(partition map[int]any) string {
if len(partition) == 0 {
return "__unpartitioned__"
}
// Sort field IDs for deterministic key
ids := make([]int, 0, len(partition))
for id := range partition {
ids = append(ids, id)
}
sort.Ints(ids)
var parts []string
for _, id := range ids {
v, err := json.Marshal(partition[id])
if err != nil {
v = []byte(fmt.Sprintf("%x", fmt.Sprintf("%v", partition[id])))
}
parts = append(parts, fmt.Sprintf("%d=%s", id, v))
}
return strings.Join(parts, "\x00")
}
// collectPositionDeletes reads position delete Parquet files and returns a map
// from normalized data file path to sorted row positions that should be deleted.
// Paths are normalized so that absolute S3 URLs and relative paths match.
func collectPositionDeletes(
ctx context.Context,
filerClient filer_pb.SeaweedFilerClient,
bucketName, tablePath string,
deleteEntries []iceberg.ManifestEntry,
) (map[string][]int64, error) {
result := make(map[string][]int64)
for _, entry := range deleteEntries {
if entry.DataFile().ContentType() != iceberg.EntryContentPosDeletes {
continue
}
fileDeletes, err := readPositionDeleteFile(ctx, filerClient, bucketName, tablePath, entry.DataFile().FilePath())
if err != nil {
return nil, fmt.Errorf("read position delete file %s: %w", entry.DataFile().FilePath(), err)
}
for filePath, positions := range fileDeletes {
normalized := normalizeIcebergPath(filePath, bucketName, tablePath)
result[normalized] = append(result[normalized], positions...)
}
}
// Sort positions for each file (binary search during filtering)
for filePath := range result {
sort.Slice(result[filePath], func(i, j int) bool {
return result[filePath][i] < result[filePath][j]
})
}
return result, nil
}
// readPositionDeleteFile reads a position delete Parquet file and returns a map
// from data file path to row positions. The file must have columns "file_path"
// (string) and "pos" (int32 or int64).
func readPositionDeleteFile(
ctx context.Context,
filerClient filer_pb.SeaweedFilerClient,
bucketName, tablePath, filePath string,
) (map[string][]int64, error) {
data, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, filePath)
if err != nil {
return nil, err
}
reader := parquet.NewReader(bytes.NewReader(data))
defer reader.Close()
pqSchema := reader.Schema()
filePathIdx := -1
posIdx := -1
for i, col := range pqSchema.Columns() {
name := strings.Join(col, ".")
switch name {
case "file_path":
filePathIdx = i
case "pos":
posIdx = i
}
}
if filePathIdx < 0 || posIdx < 0 {
return nil, fmt.Errorf("position delete file %s missing required columns (file_path=%d, pos=%d)", filePath, filePathIdx, posIdx)
}
result := make(map[string][]int64)
rows := make([]parquet.Row, 256)
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
n, readErr := reader.ReadRows(rows)
for i := 0; i < n; i++ {
row := rows[i]
fp := row[filePathIdx].String()
pos := row[posIdx].Int64()
result[fp] = append(result[fp], pos)
}
if readErr != nil {
if readErr == io.EOF {
break
}
return nil, readErr
}
}
return result, nil
}
// equalityDeleteGroup holds a set of delete keys for a specific set of equality field IDs.
// Different equality delete files may use different field IDs, so deletes are grouped.
type equalityDeleteGroup struct {
FieldIDs []int
Keys map[string]struct{}
}
// collectEqualityDeletes reads equality delete Parquet files and returns groups
// of delete keys, one per distinct set of equality field IDs. This correctly
// handles the case where different delete files use different equality columns.
func collectEqualityDeletes(
ctx context.Context,
filerClient filer_pb.SeaweedFilerClient,
bucketName, tablePath string,
deleteEntries []iceberg.ManifestEntry,
schema *iceberg.Schema,
) ([]equalityDeleteGroup, error) {
type groupState struct {
fieldIDs []int
keys map[string]struct{}
}
groups := make(map[string]*groupState)
for _, entry := range deleteEntries {
if entry.DataFile().ContentType() != iceberg.EntryContentEqDeletes {
continue
}
eqFieldIDs := entry.DataFile().EqualityFieldIDs()
if len(eqFieldIDs) == 0 {
continue
}
groupKey := fmt.Sprint(eqFieldIDs)
gs, ok := groups[groupKey]
if !ok {
gs = &groupState{fieldIDs: eqFieldIDs, keys: make(map[string]struct{})}
groups[groupKey] = gs
}
keys, err := readEqualityDeleteFile(ctx, filerClient, bucketName, tablePath, entry.DataFile().FilePath(), eqFieldIDs, schema)
if err != nil {
return nil, fmt.Errorf("read equality delete file %s: %w", entry.DataFile().FilePath(), err)
}
for k := range keys {
gs.keys[k] = struct{}{}
}
}
result := make([]equalityDeleteGroup, 0, len(groups))
for _, gs := range groups {
result = append(result, equalityDeleteGroup{FieldIDs: gs.fieldIDs, Keys: gs.keys})
}
return result, nil
}
// readEqualityDeleteFile reads an equality delete Parquet file and returns a set
// of composite keys built from the specified field IDs. The Iceberg schema is used
// to map field IDs to column names, which are then looked up in the Parquet schema.
func readEqualityDeleteFile(
ctx context.Context,
filerClient filer_pb.SeaweedFilerClient,
bucketName, tablePath, filePath string,
fieldIDs []int,
icebergSchema *iceberg.Schema,
) (map[string]struct{}, error) {
data, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, filePath)
if err != nil {
return nil, err
}
reader := parquet.NewReader(bytes.NewReader(data))
defer reader.Close()
colIndices, err := resolveEqualityColIndices(reader.Schema(), fieldIDs, icebergSchema)
if err != nil {
return nil, fmt.Errorf("resolve columns in %s: %w", filePath, err)
}
result := make(map[string]struct{})
rows := make([]parquet.Row, 256)
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
n, readErr := reader.ReadRows(rows)
for i := 0; i < n; i++ {
key := buildEqualityKey(rows[i], colIndices)
result[key] = struct{}{}
}
if readErr != nil {
if readErr == io.EOF {
break
}
return nil, readErr
}
}
return result, nil
}
// buildEqualityKey builds a composite string key from specific column values
// in a row. Each value is serialized as "kind:length:value" to avoid ambiguity
// between types (e.g., int 123 vs string "123") and to prevent collisions from
// values containing separator characters.
func buildEqualityKey(row parquet.Row, colIndices []int) string {
if len(colIndices) == 1 {
v := row[colIndices[0]]
s := v.String()
return fmt.Sprintf("%d:%d:%s", v.Kind(), len(s), s)
}
var b strings.Builder
for _, idx := range colIndices {
v := row[idx]
s := v.String()
fmt.Fprintf(&b, "%d:%d:%s", v.Kind(), len(s), s)
}
return b.String()
}
// resolveEqualityColIndices maps Iceberg field IDs to Parquet column indices.
func resolveEqualityColIndices(pqSchema *parquet.Schema, fieldIDs []int, icebergSchema *iceberg.Schema) ([]int, error) {
if len(fieldIDs) == 0 {
return nil, nil
}
colNameToIdx := make(map[string]int)
for i, col := range pqSchema.Columns() {
colNameToIdx[strings.Join(col, ".")] = i
}
indices := make([]int, len(fieldIDs))
for i, fid := range fieldIDs {
field, ok := icebergSchema.FindFieldByID(fid)
if !ok {
return nil, fmt.Errorf("field ID %d not found in iceberg schema", fid)
}
idx, ok := colNameToIdx[field.Name]
if !ok {
return nil, fmt.Errorf("column %q (field ID %d) not found in parquet schema", field.Name, fid)
}
indices[i] = idx
}
return indices, nil
}
// mergeParquetFiles reads multiple small Parquet files and merges them into
// a single Parquet file, optionally filtering out rows matching position or
// equality deletes. Files are processed one at a time to keep memory usage
// proportional to a single input file plus the output buffer.
func mergeParquetFiles(
ctx context.Context,
filerClient filer_pb.SeaweedFilerClient,
bucketName, tablePath string,
entries []iceberg.ManifestEntry,
positionDeletes map[string][]int64,
eqDeleteGroups []equalityDeleteGroup,
icebergSchema *iceberg.Schema,
) ([]byte, int64, error) {
if len(entries) == 0 {
return nil, 0, fmt.Errorf("no entries to merge")
}
// Load the first file to obtain the schema for the writer.
firstData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, entries[0].DataFile().FilePath())
if err != nil {
return nil, 0, fmt.Errorf("read parquet file %s: %w", entries[0].DataFile().FilePath(), err)
}
firstReader := parquet.NewReader(bytes.NewReader(firstData))
parquetSchema := firstReader.Schema()
if parquetSchema == nil {
firstReader.Close()
return nil, 0, fmt.Errorf("no parquet schema found in %s", entries[0].DataFile().FilePath())
}
// Resolve equality delete column indices for each group.
type resolvedEqGroup struct {
colIndices []int
keys map[string]struct{}
}
var resolvedEqGroups []resolvedEqGroup
if len(eqDeleteGroups) > 0 && icebergSchema != nil {
for _, g := range eqDeleteGroups {
indices, resolveErr := resolveEqualityColIndices(parquetSchema, g.FieldIDs, icebergSchema)
if resolveErr != nil {
firstReader.Close()
return nil, 0, fmt.Errorf("resolve equality columns: %w", resolveErr)
}
resolvedEqGroups = append(resolvedEqGroups, resolvedEqGroup{colIndices: indices, keys: g.Keys})
}
}
var outputBuf bytes.Buffer
writer := parquet.NewWriter(&outputBuf, parquetSchema)
var totalRows int64
rows := make([]parquet.Row, 256)
hasEqDeletes := len(resolvedEqGroups) > 0
// drainReader streams rows from reader into writer, filtering out deleted
// rows. source is the data file path (used for error messages and
// position delete lookups).
drainReader := func(reader *parquet.Reader, source string) error {
defer reader.Close()
// Normalize source path so it matches the normalized keys in positionDeletes.
normalizedSource := normalizeIcebergPath(source, bucketName, tablePath)
posDeletes := positionDeletes[normalizedSource]
posDeleteIdx := 0
var absolutePos int64
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
n, readErr := reader.ReadRows(rows)
if n > 0 {
// Filter rows if we have any deletes
if len(posDeletes) > 0 || hasEqDeletes {
writeIdx := 0
for i := 0; i < n; i++ {
rowPos := absolutePos + int64(i)
// Check position deletes (sorted, so advance index)
if len(posDeletes) > 0 {
for posDeleteIdx < len(posDeletes) && posDeletes[posDeleteIdx] < rowPos {
posDeleteIdx++
}
if posDeleteIdx < len(posDeletes) && posDeletes[posDeleteIdx] == rowPos {
posDeleteIdx++
continue // skip this row
}
}
// Check equality deletes — each group independently
deleted := false
for _, g := range resolvedEqGroups {
key := buildEqualityKey(rows[i], g.colIndices)
if _, ok := g.keys[key]; ok {
deleted = true
break
}
}
if deleted {
continue // skip this row
}
rows[writeIdx] = rows[i]
writeIdx++
}
absolutePos += int64(n)
if writeIdx > 0 {
if _, writeErr := writer.WriteRows(rows[:writeIdx]); writeErr != nil {
return fmt.Errorf("write rows from %s: %w", source, writeErr)
}
totalRows += int64(writeIdx)
}
} else {
if _, writeErr := writer.WriteRows(rows[:n]); writeErr != nil {
return fmt.Errorf("write rows from %s: %w", source, writeErr)
}
totalRows += int64(n)
}
}
if readErr != nil {
if readErr == io.EOF {
return nil
}
return fmt.Errorf("read rows from %s: %w", source, readErr)
}
}
}
// Drain the first file.
firstSource := entries[0].DataFile().FilePath()
if err := drainReader(firstReader, firstSource); err != nil {
writer.Close()
return nil, 0, err
}
firstData = nil // allow GC
// Process remaining files one at a time.
for _, entry := range entries[1:] {
select {
case <-ctx.Done():
writer.Close()
return nil, 0, ctx.Err()
default:
}
data, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, entry.DataFile().FilePath())
if err != nil {
writer.Close()
return nil, 0, fmt.Errorf("read parquet file %s: %w", entry.DataFile().FilePath(), err)
}
reader := parquet.NewReader(bytes.NewReader(data))
if !schemasEqual(parquetSchema, reader.Schema()) {
reader.Close()
writer.Close()
return nil, 0, fmt.Errorf("schema mismatch in %s: cannot merge files with different schemas", entry.DataFile().FilePath())
}
if err := drainReader(reader, entry.DataFile().FilePath()); err != nil {
writer.Close()
return nil, 0, err
}
// data goes out of scope here, eligible for GC before next iteration.
}
if err := writer.Close(); err != nil {
return nil, 0, fmt.Errorf("close writer: %w", err)
}
return outputBuf.Bytes(), totalRows, nil
}
// compactRandomSuffix returns a short random hex string for use in artifact
// filenames to prevent collisions between concurrent runs.
func compactRandomSuffix() string {
b := make([]byte, 4)
if _, err := rand.Read(b); err != nil {
return fmt.Sprintf("%x", time.Now().UnixNano()&0xFFFFFFFF)
}
return hex.EncodeToString(b)
}
// schemasEqual compares two parquet schemas structurally.
func schemasEqual(a, b *parquet.Schema) bool {
if a == b {
return true
}
if a == nil || b == nil {
return false
}
return parquet.EqualNodes(a, b)
}
// ensureFilerDir ensures a directory exists in the filer.
func ensureFilerDir(ctx context.Context, client filer_pb.SeaweedFilerClient, dirPath string) error {
parentDir := path.Dir(dirPath)
dirName := path.Base(dirPath)
_, err := filer_pb.LookupEntry(ctx, client, &filer_pb.LookupDirectoryEntryRequest{
Directory: parentDir,
Name: dirName,
})
if err == nil {
return nil // already exists
}
if !errors.Is(err, filer_pb.ErrNotFound) && status.Code(err) != codes.NotFound {
return fmt.Errorf("lookup dir %s: %w", dirPath, err)
}
resp, createErr := client.CreateEntry(ctx, &filer_pb.CreateEntryRequest{
Directory: parentDir,
Entry: &filer_pb.Entry{
Name: dirName,
IsDirectory: true,
Attributes: &filer_pb.FuseAttributes{
Mtime: time.Now().Unix(),
Crtime: time.Now().Unix(),
FileMode: uint32(0755),
},
},
})
if createErr != nil {
return createErr
}
if resp.Error != "" && !strings.Contains(resp.Error, "exist") {
return fmt.Errorf("create dir %s: %s", dirPath, resp.Error)
}
return nil
}