iceberg: add delete file rewrite maintenance (#8664)

* iceberg: add delete file rewrite maintenance

* iceberg: preserve untouched delete files during rewrites

* iceberg: share detection threshold defaults

* iceberg: add partition-scoped maintenance filters (#8665)

* iceberg: add partition-scoped maintenance filters

* iceberg: tighten where-filter partition matching
This commit is contained in:
Chris Lu
2026-03-16 21:11:09 -07:00
committed by GitHub
parent a3717cd4b5
commit e5c0889473
11 changed files with 2138 additions and 115 deletions

View File

@@ -48,6 +48,10 @@ func (h *Handler) compactDataFiles(
if err != nil {
return "", nil, fmt.Errorf("load metadata: %w", err)
}
predicate, err := parsePartitionPredicate(config.Where, meta)
if err != nil {
return "", nil, err
}
currentSnap := meta.CurrentSnapshot()
if currentSnap == nil || currentSnap.ManifestList == "" {
@@ -95,6 +99,8 @@ func (h *Handler) compactDataFiles(
allEntries = append(allEntries, entries...)
}
specsByID := specByID(meta)
// Collect delete entries if we need to apply deletes
var positionDeletes map[string][]int64
var eqDeleteGroups []equalityDeleteGroup
@@ -112,9 +118,23 @@ func (h *Handler) compactDataFiles(
allDeleteEntries = append(allDeleteEntries, entries...)
}
// Separate position and equality deletes
// Separate position and equality deletes, filtering by partition
// predicate so out-of-scope deletes don't affect the merge.
var posDeleteEntries, eqDeleteEntries []iceberg.ManifestEntry
for _, entry := range allDeleteEntries {
if predicate != nil {
spec, ok := specsByID[int(entry.DataFile().SpecID())]
if !ok {
continue
}
match, err := predicate.Matches(spec, entry.DataFile().Partition())
if err != nil {
return "", nil, err
}
if !match {
continue
}
}
switch entry.DataFile().ContentType() {
case iceberg.EntryContentPosDeletes:
posDeleteEntries = append(posDeleteEntries, entry)
@@ -138,18 +158,37 @@ func (h *Handler) compactDataFiles(
}
}
// Build compaction bins: group small files by partition
// MinInputFiles is clamped by ParseConfig to [2, ...] so int conversion is safe.
bins := buildCompactionBins(allEntries, config.TargetFileSizeBytes, int(config.MinInputFiles))
candidateEntries := allEntries
if predicate != nil {
candidateEntries = make([]iceberg.ManifestEntry, 0, len(allEntries))
for _, entry := range allEntries {
spec, ok := specsByID[int(entry.DataFile().SpecID())]
if !ok {
continue
}
match, err := predicate.Matches(spec, entry.DataFile().Partition())
if err != nil {
return "", nil, err
}
if match {
candidateEntries = append(candidateEntries, entry)
}
}
}
minInputFiles, err := compactionMinInputFiles(config.MinInputFiles)
if err != nil {
return "", nil, err
}
// Build compaction bins: group small data files by partition.
bins := buildCompactionBins(candidateEntries, config.TargetFileSizeBytes, minInputFiles)
if len(bins) == 0 {
return "no files eligible for compaction", nil, nil
}
// Build a lookup from spec ID to PartitionSpec for per-bin manifest writing.
specByID := make(map[int]iceberg.PartitionSpec)
for _, ps := range meta.PartitionSpecs() {
specByID[ps.ID()] = ps
}
specLookup := specsByID
schema := meta.CurrentSchema()
version := meta.Version()
@@ -233,7 +272,7 @@ func (h *Handler) compactDataFiles(
// Use the partition spec matching this bin's spec ID
{
binSpec, ok := specByID[int(bin.SpecID)]
binSpec, ok := specLookup[int(bin.SpecID)]
if !ok {
glog.Warningf("iceberg compact: spec %d not found for bin %d, skipping", bin.SpecID, binIdx)
_ = deleteFilerFile(ctx, filerClient, dataDir, mergedFileName)
@@ -347,7 +386,7 @@ func (h *Handler) compactDataFiles(
var allManifests []iceberg.ManifestFile
for _, sid := range sortedSpecIDs {
se := specEntriesMap[sid]
ps, ok := specByID[int(se.specID)]
ps, ok := specLookup[int(se.specID)]
if !ok {
return "", nil, fmt.Errorf("partition spec %d not found in table metadata", se.specID)
}

View File

@@ -18,6 +18,10 @@ const (
defaultMaxCommitRetries = 5
defaultTargetFileSizeMB = 256
defaultMinInputFiles = 5
defaultDeleteTargetFileSizeMB = 64
defaultDeleteMinInputFiles = 2
defaultDeleteMaxGroupSizeMB = 256
defaultDeleteMaxOutputFiles = 8
defaultMinManifestsToRewrite = 5
minManifestsToRewrite = 2
defaultOperations = "all"
@@ -30,6 +34,11 @@ const (
MetricFilesDeleted = "files_deleted"
MetricOrphansRemoved = "orphans_removed"
MetricManifestsRewritten = "manifests_rewritten"
MetricDeleteFilesRewritten = "delete_files_rewritten"
MetricDeleteFilesWritten = "delete_files_written"
MetricDeleteBytesRewritten = "delete_bytes_rewritten"
MetricDeleteGroupsPlanned = "delete_groups_planned"
MetricDeleteGroupsSkipped = "delete_groups_skipped"
MetricEntriesTotal = "entries_total"
MetricDurationMs = "duration_ms"
)
@@ -42,9 +51,17 @@ type Config struct {
MaxCommitRetries int64
TargetFileSizeBytes int64
MinInputFiles int64
DeleteTargetFileSizeBytes int64
DeleteMinInputFiles int64
DeleteMaxFileGroupSizeBytes int64
DeleteMaxOutputFiles int64
MinManifestsToRewrite int64
Operations string
ApplyDeletes bool
Where string
RewriteStrategy string
SortFields string
SortMaxInputBytes int64
}
// ParseConfig extracts an iceberg maintenance Config from plugin config values.
@@ -57,48 +74,83 @@ func ParseConfig(values map[string]*plugin_pb.ConfigValue) Config {
MaxCommitRetries: readInt64Config(values, "max_commit_retries", defaultMaxCommitRetries),
TargetFileSizeBytes: readInt64Config(values, "target_file_size_mb", defaultTargetFileSizeMB) * 1024 * 1024,
MinInputFiles: readInt64Config(values, "min_input_files", defaultMinInputFiles),
DeleteTargetFileSizeBytes: readInt64Config(values, "delete_target_file_size_mb", defaultDeleteTargetFileSizeMB) * 1024 * 1024,
DeleteMinInputFiles: readInt64Config(values, "delete_min_input_files", defaultDeleteMinInputFiles),
DeleteMaxFileGroupSizeBytes: readInt64Config(values, "delete_max_file_group_size_mb", defaultDeleteMaxGroupSizeMB) * 1024 * 1024,
DeleteMaxOutputFiles: readInt64Config(values, "delete_max_output_files", defaultDeleteMaxOutputFiles),
MinManifestsToRewrite: readInt64Config(values, "min_manifests_to_rewrite", defaultMinManifestsToRewrite),
Operations: readStringConfig(values, "operations", defaultOperations),
ApplyDeletes: readBoolConfig(values, "apply_deletes", true),
Where: strings.TrimSpace(readStringConfig(values, "where", "")),
RewriteStrategy: strings.TrimSpace(strings.ToLower(readStringConfig(values, "rewrite_strategy", "binpack"))),
SortFields: strings.TrimSpace(readStringConfig(values, "sort_fields", "")),
SortMaxInputBytes: readInt64Config(values, "sort_max_input_mb", 0) * 1024 * 1024,
}
// Clamp to safe minimums using the default constants
// Clamp the fields that are always defaulted by worker config parsing.
if cfg.SnapshotRetentionHours <= 0 {
cfg.SnapshotRetentionHours = defaultSnapshotRetentionHours
}
if cfg.MaxSnapshotsToKeep <= 0 {
cfg.MaxSnapshotsToKeep = defaultMaxSnapshotsToKeep
}
if cfg.OrphanOlderThanHours <= 0 {
cfg.OrphanOlderThanHours = defaultOrphanOlderThanHours
}
if cfg.MaxCommitRetries <= 0 {
cfg.MaxCommitRetries = defaultMaxCommitRetries
}
cfg = applyThresholdDefaults(cfg)
if cfg.RewriteStrategy == "" {
cfg.RewriteStrategy = "binpack"
}
if cfg.RewriteStrategy != "binpack" && cfg.RewriteStrategy != "sort" {
cfg.RewriteStrategy = "binpack"
}
if cfg.SortMaxInputBytes < 0 {
cfg.SortMaxInputBytes = 0
}
return cfg
}
func applyThresholdDefaults(cfg Config) Config {
if cfg.OrphanOlderThanHours <= 0 {
cfg.OrphanOlderThanHours = defaultOrphanOlderThanHours
}
if cfg.TargetFileSizeBytes <= 0 {
cfg.TargetFileSizeBytes = defaultTargetFileSizeMB * 1024 * 1024
}
if cfg.MinInputFiles < 2 {
cfg.MinInputFiles = defaultMinInputFiles
}
if cfg.DeleteTargetFileSizeBytes <= 0 {
cfg.DeleteTargetFileSizeBytes = defaultDeleteTargetFileSizeMB * 1024 * 1024
}
if cfg.DeleteMinInputFiles < 2 {
cfg.DeleteMinInputFiles = defaultDeleteMinInputFiles
}
if cfg.DeleteMaxFileGroupSizeBytes <= 0 {
cfg.DeleteMaxFileGroupSizeBytes = defaultDeleteMaxGroupSizeMB * 1024 * 1024
}
if cfg.DeleteMaxOutputFiles <= 0 {
cfg.DeleteMaxOutputFiles = defaultDeleteMaxOutputFiles
}
if cfg.MinManifestsToRewrite < minManifestsToRewrite {
cfg.MinManifestsToRewrite = minManifestsToRewrite
}
return cfg
}
// parseOperations returns the ordered list of maintenance operations to execute.
// Order follows Iceberg best practices: compact → expire_snapshots → remove_orphans → rewrite_manifests.
// Order follows Iceberg best practices: compact → rewrite_position_delete_files
// → expire_snapshots → remove_orphans → rewrite_manifests.
// Returns an error if any unknown operation is specified or the result would be empty.
func parseOperations(ops string) ([]string, error) {
ops = strings.TrimSpace(strings.ToLower(ops))
if ops == "" || ops == "all" {
return []string{"compact", "expire_snapshots", "remove_orphans", "rewrite_manifests"}, nil
return []string{"compact", "rewrite_position_delete_files", "expire_snapshots", "remove_orphans", "rewrite_manifests"}, nil
}
validOps := map[string]struct{}{
"compact": {},
"rewrite_position_delete_files": {},
"expire_snapshots": {},
"remove_orphans": {},
"rewrite_manifests": {},
@@ -111,13 +163,14 @@ func parseOperations(ops string) ([]string, error) {
continue
}
if _, ok := validOps[op]; !ok {
return nil, fmt.Errorf("unknown maintenance operation %q (valid: compact, expire_snapshots, remove_orphans, rewrite_manifests)", op)
return nil, fmt.Errorf("unknown maintenance operation %q (valid: compact, rewrite_position_delete_files, expire_snapshots, remove_orphans, rewrite_manifests)", op)
}
requested[op] = struct{}{}
}
// Return in canonical order: compact → expire_snapshots → remove_orphans → rewrite_manifests
canonicalOrder := []string{"compact", "expire_snapshots", "remove_orphans", "rewrite_manifests"}
// Return in canonical order: compact → rewrite_position_delete_files →
// expire_snapshotsremove_orphansrewrite_manifests
canonicalOrder := []string{"compact", "rewrite_position_delete_files", "expire_snapshots", "remove_orphans", "rewrite_manifests"}
var result []string
for _, op := range canonicalOrder {
if _, ok := requested[op]; ok {

View File

@@ -0,0 +1,595 @@
package iceberg
import (
"bytes"
"context"
"fmt"
"math"
"path"
"sort"
"time"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/table"
"github.com/parquet-go/parquet-go"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3tables"
)
type deleteRewriteInput struct {
Entry iceberg.ManifestEntry
ReferencedPath string
Positions []int64
}
type deleteRewriteGroup struct {
SpecID int32
Partition map[int]any
PartitionKey string
ReferencedPath string
Inputs []deleteRewriteInput
TotalSize int64
}
type positionDeleteRow struct {
FilePath string `parquet:"file_path"`
Pos int64 `parquet:"pos"`
}
func hasEligibleDeleteRewrite(
ctx context.Context,
filerClient filer_pb.SeaweedFilerClient,
bucketName, tablePath string,
manifests []iceberg.ManifestFile,
config Config,
meta table.Metadata,
predicate *partitionPredicate,
) (bool, error) {
groups, _, err := collectDeleteRewriteGroups(ctx, filerClient, bucketName, tablePath, manifests)
if err != nil {
return false, err
}
for _, group := range groups {
if predicate != nil {
spec, ok := specByID(meta)[int(group.SpecID)]
if !ok {
continue
}
match, err := predicate.Matches(spec, group.Partition)
if err != nil {
return false, err
}
if !match {
continue
}
}
if groupEligibleForRewrite(group, config) {
return true, nil
}
}
return false, nil
}
func collectDeleteRewriteGroups(
ctx context.Context,
filerClient filer_pb.SeaweedFilerClient,
bucketName, tablePath string,
manifests []iceberg.ManifestFile,
) (map[string]*deleteRewriteGroup, []iceberg.ManifestEntry, error) {
groups := make(map[string]*deleteRewriteGroup)
var allPositionEntries []iceberg.ManifestEntry
for _, mf := range manifests {
if mf.ManifestContent() != iceberg.ManifestContentDeletes {
continue
}
manifestData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, mf.FilePath())
if err != nil {
return nil, nil, fmt.Errorf("read delete manifest %s: %w", mf.FilePath(), err)
}
entries, err := iceberg.ReadManifest(mf, bytes.NewReader(manifestData), true)
if err != nil {
return nil, nil, fmt.Errorf("parse delete manifest %s: %w", mf.FilePath(), err)
}
for _, entry := range entries {
if entry.DataFile().ContentType() != iceberg.EntryContentPosDeletes {
continue
}
allPositionEntries = append(allPositionEntries, entry)
fileDeletes, err := readPositionDeleteFile(ctx, filerClient, bucketName, tablePath, entry.DataFile().FilePath())
if err != nil {
return nil, nil, fmt.Errorf("read position delete file %s: %w", entry.DataFile().FilePath(), err)
}
if len(fileDeletes) != 1 {
// Phase 1 only rewrites files that target a single data file.
continue
}
var referencedPath string
var positions []int64
for fp, pos := range fileDeletes {
referencedPath = normalizeIcebergPath(fp, bucketName, tablePath)
positions = append(positions, pos...)
}
sort.Slice(positions, func(i, j int) bool { return positions[i] < positions[j] })
partKey := partitionKey(entry.DataFile().Partition())
groupKey := fmt.Sprintf("spec%d\x00%s\x00%s", entry.DataFile().SpecID(), partKey, referencedPath)
group, ok := groups[groupKey]
if !ok {
group = &deleteRewriteGroup{
SpecID: entry.DataFile().SpecID(),
Partition: entry.DataFile().Partition(),
PartitionKey: partKey,
ReferencedPath: referencedPath,
}
groups[groupKey] = group
}
group.Inputs = append(group.Inputs, deleteRewriteInput{
Entry: entry,
ReferencedPath: referencedPath,
Positions: positions,
})
group.TotalSize += entry.DataFile().FileSizeBytes()
}
}
return groups, allPositionEntries, nil
}
func groupEligibleForRewrite(group *deleteRewriteGroup, config Config) bool {
if group == nil {
return false
}
if len(group.Inputs) < 2 {
return false
}
if group.TotalSize > config.DeleteMaxFileGroupSizeBytes {
return false
}
target := config.DeleteTargetFileSizeBytes
if target <= 0 {
target = defaultDeleteTargetFileSizeMB * 1024 * 1024
}
outputFiles := int64(estimatedDeleteOutputFiles(group.TotalSize, target))
if config.DeleteMaxOutputFiles > 0 && outputFiles > config.DeleteMaxOutputFiles {
return false
}
return int64(len(group.Inputs)) >= config.DeleteMinInputFiles
}
func estimatedDeleteOutputFiles(totalSize, targetSize int64) int {
if totalSize <= 0 || targetSize <= 0 {
return 1
}
count := int(math.Ceil(float64(totalSize) / float64(targetSize)))
if count < 1 {
return 1
}
return count
}
func manifestEntrySeqNum(entry iceberg.ManifestEntry) *int64 {
seqNum := entry.SequenceNum()
if seqNum < 0 {
return nil
}
return &seqNum
}
func manifestEntryFileSeqNum(entry iceberg.ManifestEntry) *int64 {
if fileSeqNum := entry.FileSequenceNum(); fileSeqNum != nil {
value := *fileSeqNum
return &value
}
return manifestEntrySeqNum(entry)
}
func writeManifestWithContent(
filename string,
version int,
spec iceberg.PartitionSpec,
schema *iceberg.Schema,
snapshotID int64,
entries []iceberg.ManifestEntry,
content iceberg.ManifestContent,
) (iceberg.ManifestFile, []byte, error) {
var manifestBuf bytes.Buffer
mf, err := iceberg.WriteManifest(filename, &manifestBuf, version, spec, schema, snapshotID, entries)
if err != nil {
return nil, nil, err
}
manifestBytes := manifestBuf.Bytes()
if content == iceberg.ManifestContentDeletes {
manifestBytes, err = patchManifestContentBytesToDeletes(manifestBytes)
if err != nil {
return nil, nil, err
}
}
rebuilt := iceberg.NewManifestFile(version, filename, int64(len(manifestBytes)), int32(spec.ID()), snapshotID).
Content(content).
AddedFiles(mf.AddedDataFiles()).
ExistingFiles(mf.ExistingDataFiles()).
DeletedFiles(mf.DeletedDataFiles()).
AddedRows(mf.AddedRows()).
ExistingRows(mf.ExistingRows()).
DeletedRows(mf.DeletedRows()).
Partitions(mf.Partitions()).
Build()
return rebuilt, manifestBytes, nil
}
func patchManifestContentBytesToDeletes(manifestBytes []byte) ([]byte, error) {
old := append([]byte{0x0e}, []byte("content")...)
old = append(old, 0x08)
old = append(old, []byte("data")...)
new := append([]byte{0x0e}, []byte("content")...)
new = append(new, 0x0e)
new = append(new, []byte("deletes")...)
result := bytes.Replace(manifestBytes, old, new, 1)
if bytes.Equal(result, manifestBytes) {
return nil, fmt.Errorf("delete manifest content patch failed")
}
return result, nil
}
func writePositionDeleteFile(rows []positionDeleteRow) ([]byte, error) {
var buf bytes.Buffer
writer := parquet.NewWriter(&buf, parquet.SchemaOf(new(positionDeleteRow)))
for _, row := range rows {
if err := writer.Write(&row); err != nil {
return nil, fmt.Errorf("write position delete row: %w", err)
}
}
if err := writer.Close(); err != nil {
return nil, fmt.Errorf("close position delete file: %w", err)
}
return buf.Bytes(), nil
}
func (h *Handler) rewritePositionDeleteFiles(
ctx context.Context,
filerClient filer_pb.SeaweedFilerClient,
bucketName, tablePath string,
config Config,
) (string, map[string]int64, error) {
start := time.Now()
meta, metadataFileName, err := loadCurrentMetadata(ctx, filerClient, bucketName, tablePath)
if err != nil {
return "", nil, fmt.Errorf("load metadata: %w", err)
}
currentSnap := meta.CurrentSnapshot()
if currentSnap == nil || currentSnap.ManifestList == "" {
return "no current snapshot", nil, nil
}
manifestListData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, currentSnap.ManifestList)
if err != nil {
return "", nil, fmt.Errorf("read manifest list: %w", err)
}
manifests, err := iceberg.ReadManifestList(bytes.NewReader(manifestListData))
if err != nil {
return "", nil, fmt.Errorf("parse manifest list: %w", err)
}
var dataManifests []iceberg.ManifestFile
var allEqualityEntries []iceberg.ManifestEntry
for _, mf := range manifests {
switch mf.ManifestContent() {
case iceberg.ManifestContentData:
dataManifests = append(dataManifests, mf)
case iceberg.ManifestContentDeletes:
manifestData, readErr := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, mf.FilePath())
if readErr != nil {
return "", nil, fmt.Errorf("read delete manifest %s: %w", mf.FilePath(), readErr)
}
entries, parseErr := iceberg.ReadManifest(mf, bytes.NewReader(manifestData), true)
if parseErr != nil {
return "", nil, fmt.Errorf("parse delete manifest %s: %w", mf.FilePath(), parseErr)
}
for _, entry := range entries {
if entry.DataFile().ContentType() == iceberg.EntryContentEqDeletes {
allEqualityEntries = append(allEqualityEntries, entry)
}
}
}
}
groupMap, allPositionEntries, err := collectDeleteRewriteGroups(ctx, filerClient, bucketName, tablePath, manifests)
if err != nil {
return "", nil, err
}
if len(groupMap) == 0 {
return "no position delete files eligible for rewrite", nil, nil
}
type artifact struct {
dir, fileName string
}
var writtenArtifacts []artifact
committed := false
defer func() {
if committed || len(writtenArtifacts) == 0 {
return
}
cleanupCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
for _, a := range writtenArtifacts {
if err := deleteFilerFile(cleanupCtx, filerClient, a.dir, a.fileName); err != nil {
glog.Warningf("iceberg delete rewrite: failed to clean up artifact %s/%s: %v", a.dir, a.fileName, err)
}
}
}()
specByID := specByID(meta)
predicate, err := parsePartitionPredicate(config.Where, meta)
if err != nil {
return "", nil, err
}
type specEntries struct {
specID int32
entries []iceberg.ManifestEntry
}
specEntriesMap := make(map[int32]*specEntries)
addToSpec := func(specID int32, entry iceberg.ManifestEntry) {
se, ok := specEntriesMap[specID]
if !ok {
se = &specEntries{specID: specID}
specEntriesMap[specID] = se
}
se.entries = append(se.entries, entry)
}
newSnapID := time.Now().UnixMilli()
version := meta.Version()
snapshotID := currentSnap.SnapshotID
seqNum := currentSnap.SequenceNumber + 1
metaDir := path.Join(s3tables.TablesPath, bucketName, tablePath, "metadata")
dataDir := path.Join(s3tables.TablesPath, bucketName, tablePath, "data")
artifactSuffix := compactRandomSuffix()
replacedPaths := make(map[string]struct{})
var rewrittenGroups int64
var skippedGroups int64
var deleteFilesRewritten int64
var deleteFilesWritten int64
var deleteBytesRewritten int64
sortedKeys := make([]string, 0, len(groupMap))
for key := range groupMap {
sortedKeys = append(sortedKeys, key)
}
sort.Strings(sortedKeys)
for _, key := range sortedKeys {
group := groupMap[key]
if predicate != nil {
spec, ok := specByID[int(group.SpecID)]
if !ok {
continue
}
match, err := predicate.Matches(spec, group.Partition)
if err != nil {
return "", nil, err
}
if !match {
skippedGroups++
continue
}
}
if !groupEligibleForRewrite(group, config) {
skippedGroups++
continue
}
rows := make([]positionDeleteRow, 0)
for _, input := range group.Inputs {
for _, pos := range input.Positions {
rows = append(rows, positionDeleteRow{FilePath: input.ReferencedPath, Pos: pos})
}
replacedPaths[input.Entry.DataFile().FilePath()] = struct{}{}
deleteFilesRewritten++
deleteBytesRewritten += input.Entry.DataFile().FileSizeBytes()
}
sort.Slice(rows, func(i, j int) bool {
if rows[i].FilePath != rows[j].FilePath {
return rows[i].FilePath < rows[j].FilePath
}
return rows[i].Pos < rows[j].Pos
})
outputFiles := estimatedDeleteOutputFiles(group.TotalSize, config.DeleteTargetFileSizeBytes)
rowsPerFile := (len(rows) + outputFiles - 1) / outputFiles
if rowsPerFile < 1 {
rowsPerFile = len(rows)
}
for startIdx, fileIdx := 0, 0; startIdx < len(rows); startIdx, fileIdx = startIdx+rowsPerFile, fileIdx+1 {
endIdx := startIdx + rowsPerFile
if endIdx > len(rows) {
endIdx = len(rows)
}
outputRows := rows[startIdx:endIdx]
deleteBytes, err := writePositionDeleteFile(outputRows)
if err != nil {
return "", nil, err
}
fileName := fmt.Sprintf("rewrite-delete-%d-%s-%d.parquet", newSnapID, artifactSuffix, deleteFilesWritten)
if err := ensureFilerDir(ctx, filerClient, dataDir); err != nil {
return "", nil, fmt.Errorf("ensure data dir: %w", err)
}
if err := saveFilerFile(ctx, filerClient, dataDir, fileName, deleteBytes); err != nil {
return "", nil, fmt.Errorf("save rewritten delete file: %w", err)
}
writtenArtifacts = append(writtenArtifacts, artifact{dir: dataDir, fileName: fileName})
spec, ok := specByID[int(group.SpecID)]
if !ok {
return "", nil, fmt.Errorf("partition spec %d not found", group.SpecID)
}
dfBuilder, err := iceberg.NewDataFileBuilder(
spec,
iceberg.EntryContentPosDeletes,
path.Join("data", fileName),
iceberg.ParquetFile,
group.Partition,
nil, nil,
int64(len(outputRows)),
int64(len(deleteBytes)),
)
if err != nil {
return "", nil, fmt.Errorf("build rewritten delete file: %w", err)
}
entry := iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &newSnapID, nil, nil, dfBuilder.Build())
addToSpec(group.SpecID, entry)
deleteFilesWritten++
}
for _, input := range group.Inputs {
delEntry := iceberg.NewManifestEntry(
iceberg.EntryStatusDELETED,
&newSnapID,
manifestEntrySeqNum(input.Entry),
manifestEntryFileSeqNum(input.Entry),
input.Entry.DataFile(),
)
addToSpec(group.SpecID, delEntry)
}
rewrittenGroups++
}
if rewrittenGroups == 0 {
return "no position delete files eligible for rewrite", nil, nil
}
for _, entry := range allEqualityEntries {
existingEntry := iceberg.NewManifestEntry(
iceberg.EntryStatusEXISTING,
func() *int64 { id := entry.SnapshotID(); return &id }(),
manifestEntrySeqNum(entry),
manifestEntryFileSeqNum(entry),
entry.DataFile(),
)
addToSpec(entry.DataFile().SpecID(), existingEntry)
}
for _, entry := range allPositionEntries {
if _, replaced := replacedPaths[entry.DataFile().FilePath()]; replaced {
continue
}
existingEntry := iceberg.NewManifestEntry(
iceberg.EntryStatusEXISTING,
func() *int64 { id := entry.SnapshotID(); return &id }(),
manifestEntrySeqNum(entry),
manifestEntryFileSeqNum(entry),
entry.DataFile(),
)
addToSpec(entry.DataFile().SpecID(), existingEntry)
}
sortedSpecIDs := make([]int32, 0, len(specEntriesMap))
for specID := range specEntriesMap {
sortedSpecIDs = append(sortedSpecIDs, specID)
}
sort.Slice(sortedSpecIDs, func(i, j int) bool { return sortedSpecIDs[i] < sortedSpecIDs[j] })
allManifests := make([]iceberg.ManifestFile, 0, len(dataManifests)+len(sortedSpecIDs))
allManifests = append(allManifests, dataManifests...)
for _, specID := range sortedSpecIDs {
spec, ok := specByID[int(specID)]
if !ok {
return "", nil, fmt.Errorf("partition spec %d not found", specID)
}
manifestName := fmt.Sprintf("rewrite-delete-%d-%s-spec%d.avro", newSnapID, artifactSuffix, specID)
manifestPath := path.Join("metadata", manifestName)
mf, manifestBytes, err := writeManifestWithContent(
manifestPath,
version,
spec,
meta.CurrentSchema(),
newSnapID,
specEntriesMap[specID].entries,
iceberg.ManifestContentDeletes,
)
if err != nil {
return "", nil, fmt.Errorf("write delete manifest for spec %d: %w", specID, err)
}
if err := saveFilerFile(ctx, filerClient, metaDir, manifestName, manifestBytes); err != nil {
return "", nil, fmt.Errorf("save delete manifest for spec %d: %w", specID, err)
}
writtenArtifacts = append(writtenArtifacts, artifact{dir: metaDir, fileName: manifestName})
allManifests = append(allManifests, mf)
}
var manifestListBuf bytes.Buffer
if err := iceberg.WriteManifestList(version, &manifestListBuf, newSnapID, &snapshotID, &seqNum, 0, allManifests); err != nil {
return "", nil, fmt.Errorf("write delete manifest list: %w", err)
}
manifestListName := fmt.Sprintf("snap-%d-%s.avro", newSnapID, artifactSuffix)
if err := saveFilerFile(ctx, filerClient, metaDir, manifestListName, manifestListBuf.Bytes()); err != nil {
return "", nil, fmt.Errorf("save delete manifest list: %w", err)
}
writtenArtifacts = append(writtenArtifacts, artifact{dir: metaDir, fileName: manifestListName})
manifestListLocation := path.Join("metadata", manifestListName)
err = h.commitWithRetry(ctx, filerClient, bucketName, tablePath, metadataFileName, config, func(currentMeta table.Metadata, builder *table.MetadataBuilder) error {
cs := currentMeta.CurrentSnapshot()
if cs == nil || cs.SnapshotID != snapshotID {
return errStalePlan
}
newSnapshot := &table.Snapshot{
SnapshotID: newSnapID,
ParentSnapshotID: &snapshotID,
SequenceNumber: seqNum,
TimestampMs: newSnapID,
ManifestList: manifestListLocation,
Summary: &table.Summary{
Operation: table.OpReplace,
Properties: map[string]string{
"maintenance": "rewrite_position_delete_files",
"delete-files-rewritten": fmt.Sprintf("%d", deleteFilesRewritten),
"delete-files-written": fmt.Sprintf("%d", deleteFilesWritten),
"delete-groups": fmt.Sprintf("%d", rewrittenGroups),
},
},
SchemaID: func() *int {
id := meta.CurrentSchema().ID
return &id
}(),
}
if err := builder.AddSnapshot(newSnapshot); err != nil {
return err
}
return builder.SetSnapshotRef(table.MainBranch, newSnapID, table.BranchRef)
})
if err != nil {
return "", nil, fmt.Errorf("commit delete rewrite: %w", err)
}
committed = true
metrics := map[string]int64{
MetricDeleteFilesRewritten: deleteFilesRewritten,
MetricDeleteFilesWritten: deleteFilesWritten,
MetricDeleteBytesRewritten: deleteBytesRewritten,
MetricDeleteGroupsPlanned: rewrittenGroups,
MetricDeleteGroupsSkipped: skippedGroups,
MetricDurationMs: time.Since(start).Milliseconds(),
}
return fmt.Sprintf(
"rewrote %d position delete files into %d across %d group(s)",
deleteFilesRewritten,
deleteFilesWritten,
rewrittenGroups,
), metrics, nil
}

View File

@@ -153,20 +153,14 @@ func (h *Handler) scanTablesForMaintenance(
}
func normalizeDetectionConfig(config Config) Config {
normalized := config
if normalized.TargetFileSizeBytes <= 0 {
normalized.TargetFileSizeBytes = defaultTargetFileSizeMB * 1024 * 1024
config = applyThresholdDefaults(config)
if config.SnapshotRetentionHours <= 0 {
config.SnapshotRetentionHours = defaultSnapshotRetentionHours
}
if normalized.MinInputFiles < 2 {
normalized.MinInputFiles = defaultMinInputFiles
if config.MaxSnapshotsToKeep <= 0 {
config.MaxSnapshotsToKeep = defaultMaxSnapshotsToKeep
}
if normalized.MinManifestsToRewrite < minManifestsToRewrite {
normalized.MinManifestsToRewrite = minManifestsToRewrite
}
if normalized.OrphanOlderThanHours <= 0 {
normalized.OrphanOlderThanHours = defaultOrphanOlderThanHours
}
return normalized
return config
}
func (h *Handler) tableNeedsMaintenance(
@@ -181,6 +175,25 @@ func (h *Handler) tableNeedsMaintenance(
) (bool, error) {
config = normalizeDetectionConfig(config)
var predicate *partitionPredicate
if strings.TrimSpace(config.Where) != "" {
needsPredicate := false
for _, op := range ops {
if op == "compact" || op == "rewrite_position_delete_files" || op == "rewrite_manifests" {
needsPredicate = true
break
}
}
if needsPredicate {
var err error
predicate, err = parsePartitionPredicate(config.Where, meta)
if err != nil {
return false, err
}
}
}
_ = predicate // used by rewrite_position_delete_files; planning index handles compact/rewrite_manifests
// Evaluate the metadata-only expiration check first so large tables do not
// pay for manifest reads when snapshot expiry already makes them eligible.
for _, op := range ops {
@@ -267,6 +280,20 @@ func (h *Handler) tableNeedsMaintenance(
if eligible {
return true, nil
}
case "rewrite_position_delete_files":
manifests, err := getCurrentManifests()
if err != nil {
opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err))
continue
}
eligible, err := hasEligibleDeleteRewrite(ctx, filerClient, bucketName, tablePath, manifests, config, meta, predicate)
if err != nil {
opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err))
continue
}
if eligible {
return true, nil
}
case "rewrite_manifests":
eligible, err := checkPlanningIndex(op, (*planningIndex).rewriteManifestsEligible)
if err != nil {
@@ -351,6 +378,8 @@ func hasEligibleCompaction(
bucketName, tablePath string,
manifests []iceberg.ManifestFile,
config Config,
meta table.Metadata,
predicate *partitionPredicate,
) (bool, error) {
if len(manifests) == 0 {
return false, nil
@@ -390,10 +419,81 @@ func hasEligibleCompaction(
allEntries = append(allEntries, entries...)
}
bins := buildCompactionBins(allEntries, config.TargetFileSizeBytes, minInputFiles)
candidateEntries := allEntries
if predicate != nil {
specsByID := specByID(meta)
candidateEntries = make([]iceberg.ManifestEntry, 0, len(allEntries))
for _, entry := range allEntries {
spec, ok := specsByID[int(entry.DataFile().SpecID())]
if !ok {
continue
}
match, err := predicate.Matches(spec, entry.DataFile().Partition())
if err != nil {
return false, err
}
if match {
candidateEntries = append(candidateEntries, entry)
}
}
}
bins := buildCompactionBins(candidateEntries, config.TargetFileSizeBytes, minInputFiles)
return len(bins) > 0, nil
}
func countDataManifestsForRewrite(
ctx context.Context,
filerClient filer_pb.SeaweedFilerClient,
bucketName, tablePath string,
manifests []iceberg.ManifestFile,
meta table.Metadata,
predicate *partitionPredicate,
) (int64, error) {
if predicate == nil {
return countDataManifests(manifests), nil
}
specsByID := specByID(meta)
var count int64
for _, mf := range manifests {
if mf.ManifestContent() != iceberg.ManifestContentData {
continue
}
manifestData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, mf.FilePath())
if err != nil {
return 0, fmt.Errorf("read manifest %s: %w", mf.FilePath(), err)
}
entries, err := iceberg.ReadManifest(mf, bytes.NewReader(manifestData), true)
if err != nil {
return 0, fmt.Errorf("parse manifest %s: %w", mf.FilePath(), err)
}
if len(entries) == 0 {
continue
}
spec, ok := specsByID[int(mf.PartitionSpecID())]
if !ok {
continue
}
allMatch := len(entries) > 0
for _, entry := range entries {
match, err := predicate.Matches(spec, entry.DataFile().Partition())
if err != nil {
return 0, err
}
if !match {
allMatch = false
break
}
}
if allMatch {
count++
}
}
return count, nil
}
func compactionMinInputFiles(minInputFiles int64) (int, error) {
// Ensure the configured value is positive and fits into the platform's int type
if minInputFiles <= 0 {

View File

@@ -498,7 +498,7 @@ func (r *recordingExecutionSender) SendCompleted(c *plugin_pb.JobCompleted) erro
func TestExpireSnapshotsExecution(t *testing.T) {
fs, client := startFakeFiler(t)
now := time.Now().UnixMilli()
now := time.Now().Add(-10 * time.Second).UnixMilli()
setup := tableSetup{
BucketName: "test-bucket",
Namespace: "analytics",
@@ -541,7 +541,7 @@ func TestExpireSnapshotsExecution(t *testing.T) {
func TestExpireSnapshotsNothingToExpire(t *testing.T) {
fs, client := startFakeFiler(t)
now := time.Now().UnixMilli()
now := time.Now().Add(-10 * time.Second).UnixMilli()
setup := tableSetup{
BucketName: "test-bucket",
Namespace: "ns",
@@ -1184,13 +1184,16 @@ func TestDetectSchedulesSnapshotExpiryDespiteCompactionEvaluationError(t *testin
Namespace: "analytics",
TableName: "events",
Snapshots: []table.Snapshot{
{SnapshotID: 1, TimestampMs: now - 1, ManifestList: "metadata/snap-1.avro", SequenceNumber: 1},
{SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro", SequenceNumber: 1},
{SnapshotID: 2, TimestampMs: now + 1, ManifestList: "metadata/snap-2.avro", SequenceNumber: 2},
},
}
populateTable(t, fs, setup)
// Corrupt manifest lists so compaction evaluation fails.
metaDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath(), "metadata")
manifestListName := path.Base(setup.Snapshots[0].ManifestList)
for _, snap := range setup.Snapshots {
manifestListName := path.Base(snap.ManifestList)
fs.putEntry(metaDir, manifestListName, &filer_pb.Entry{
Name: manifestListName,
Attributes: &filer_pb.FuseAttributes{
@@ -1199,11 +1202,12 @@ func TestDetectSchedulesSnapshotExpiryDespiteCompactionEvaluationError(t *testin
},
Content: []byte("not-a-manifest-list"),
})
}
handler := NewHandler(nil)
config := Config{
SnapshotRetentionHours: 0,
MaxSnapshotsToKeep: 10,
SnapshotRetentionHours: 24 * 365, // very long retention so age doesn't trigger
MaxSnapshotsToKeep: 1, // 2 snapshots > 1 triggers expiry
Operations: "compact,expire_snapshots",
}
@@ -2207,6 +2211,120 @@ func populateTableWithDeleteFiles(
return meta
}
func loadLiveDeleteFilePaths(
t *testing.T,
client filer_pb.SeaweedFilerClient,
bucketName, tablePath string,
) (posPaths, eqPaths []string) {
t.Helper()
meta, _, err := loadCurrentMetadata(context.Background(), client, bucketName, tablePath)
if err != nil {
t.Fatalf("loadCurrentMetadata: %v", err)
}
manifests, err := loadCurrentManifests(context.Background(), client, bucketName, tablePath, meta)
if err != nil {
t.Fatalf("loadCurrentManifests: %v", err)
}
for _, mf := range manifests {
if mf.ManifestContent() != iceberg.ManifestContentDeletes {
continue
}
manifestData, err := loadFileByIcebergPath(context.Background(), client, bucketName, tablePath, mf.FilePath())
if err != nil {
t.Fatalf("load delete manifest: %v", err)
}
entries, err := iceberg.ReadManifest(mf, bytes.NewReader(manifestData), true)
if err != nil {
t.Fatalf("read delete manifest: %v", err)
}
for _, entry := range entries {
switch entry.DataFile().ContentType() {
case iceberg.EntryContentPosDeletes:
posPaths = append(posPaths, entry.DataFile().FilePath())
case iceberg.EntryContentEqDeletes:
eqPaths = append(eqPaths, entry.DataFile().FilePath())
}
}
}
sort.Strings(posPaths)
sort.Strings(eqPaths)
return posPaths, eqPaths
}
func rewriteDeleteManifestsAsMixed(
t *testing.T,
fs *fakeFilerServer,
client filer_pb.SeaweedFilerClient,
setup tableSetup,
) {
t.Helper()
meta, _, err := loadCurrentMetadata(context.Background(), client, setup.BucketName, setup.tablePath())
if err != nil {
t.Fatalf("loadCurrentMetadata: %v", err)
}
manifests, err := loadCurrentManifests(context.Background(), client, setup.BucketName, setup.tablePath(), meta)
if err != nil {
t.Fatalf("loadCurrentManifests: %v", err)
}
var dataManifests []iceberg.ManifestFile
var deleteEntries []iceberg.ManifestEntry
for _, mf := range manifests {
if mf.ManifestContent() == iceberg.ManifestContentData {
dataManifests = append(dataManifests, mf)
continue
}
manifestData, err := loadFileByIcebergPath(context.Background(), client, setup.BucketName, setup.tablePath(), mf.FilePath())
if err != nil {
t.Fatalf("load delete manifest: %v", err)
}
entries, err := iceberg.ReadManifest(mf, bytes.NewReader(manifestData), true)
if err != nil {
t.Fatalf("read delete manifest: %v", err)
}
for _, entry := range entries {
deleteEntries = append(deleteEntries, entry)
}
}
spec := *iceberg.UnpartitionedSpec
version := meta.Version()
metaDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath(), "metadata")
manifestName := "mixed-delete-manifest-1.avro"
manifestPath := path.Join("metadata", manifestName)
var manifestBuf bytes.Buffer
_, err = iceberg.WriteManifest(manifestPath, &manifestBuf, version, spec, meta.CurrentSchema(), 1, deleteEntries)
if err != nil {
t.Fatalf("write mixed delete manifest: %v", err)
}
mixedBytes := patchManifestContentToDeletes(t, manifestBuf.Bytes())
fs.putEntry(metaDir, manifestName, &filer_pb.Entry{
Name: manifestName, Content: mixedBytes,
Attributes: &filer_pb.FuseAttributes{Mtime: time.Now().Unix(), FileSize: uint64(len(mixedBytes))},
})
mixedManifest := iceberg.NewManifestFile(version, manifestPath, int64(len(mixedBytes)), int32(spec.ID()), 1).
Content(iceberg.ManifestContentDeletes).
AddedFiles(int32(len(deleteEntries))).
Build()
var manifestListBuf bytes.Buffer
seqNum := int64(1)
allManifests := append(dataManifests, mixedManifest)
if err := iceberg.WriteManifestList(version, &manifestListBuf, 1, nil, &seqNum, 0, allManifests); err != nil {
t.Fatalf("write mixed manifest list: %v", err)
}
fs.putEntry(metaDir, "snap-1.avro", &filer_pb.Entry{
Name: "snap-1.avro", Content: manifestListBuf.Bytes(),
Attributes: &filer_pb.FuseAttributes{Mtime: time.Now().Unix(), FileSize: uint64(manifestListBuf.Len())},
})
}
func TestCompactDataFilesMetrics(t *testing.T) {
fs, client := startFakeFiler(t)
@@ -2713,3 +2831,369 @@ func TestCompactDataFilesWithMixedDeletes(t *testing.T) {
}
}
}
func TestRewritePositionDeleteFilesExecution(t *testing.T) {
fs, client := startFakeFiler(t)
setup := tableSetup{BucketName: "tb", Namespace: "ns", TableName: "tbl"}
populateTableWithDeleteFiles(t, fs, setup,
[]struct {
Name string
Rows []struct {
ID int64
Name string
}
}{
{"d1.parquet", []struct {
ID int64
Name string
}{{1, "alice"}, {2, "bob"}, {3, "charlie"}}},
},
[]struct {
Name string
Rows []struct {
FilePath string
Pos int64
}
}{
{"pd1.parquet", []struct {
FilePath string
Pos int64
}{{"data/d1.parquet", 0}, {"data/d1.parquet", 2}}},
{"pd2.parquet", []struct {
FilePath string
Pos int64
}{{"data/d1.parquet", 1}}},
},
nil,
)
handler := NewHandler(nil)
config := Config{
DeleteTargetFileSizeBytes: 64 * 1024 * 1024,
DeleteMinInputFiles: 2,
DeleteMaxFileGroupSizeBytes: 128 * 1024 * 1024,
DeleteMaxOutputFiles: 4,
MaxCommitRetries: 3,
}
result, metrics, err := handler.rewritePositionDeleteFiles(context.Background(), client, setup.BucketName, setup.tablePath(), config)
if err != nil {
t.Fatalf("rewritePositionDeleteFiles: %v", err)
}
if !strings.Contains(result, "rewrote 2 position delete files into 1") {
t.Fatalf("unexpected result: %q", result)
}
if metrics[MetricDeleteFilesRewritten] != 2 {
t.Fatalf("expected 2 rewritten files, got %d", metrics[MetricDeleteFilesRewritten])
}
if metrics[MetricDeleteFilesWritten] != 1 {
t.Fatalf("expected 1 written file, got %d", metrics[MetricDeleteFilesWritten])
}
liveDeletePaths, _ := loadLiveDeleteFilePaths(t, client, setup.BucketName, setup.tablePath())
if len(liveDeletePaths) != 1 {
t.Fatalf("expected 1 live rewritten delete file, got %v", liveDeletePaths)
}
if !strings.HasPrefix(liveDeletePaths[0], "data/rewrite-delete-") {
t.Fatalf("expected rewritten delete file path, got %q", liveDeletePaths[0])
}
}
func TestRewritePositionDeleteFilesDetection(t *testing.T) {
fs, client := startFakeFiler(t)
setup := tableSetup{BucketName: "tb", Namespace: "ns", TableName: "tbl"}
populateTableWithDeleteFiles(t, fs, setup,
[]struct {
Name string
Rows []struct {
ID int64
Name string
}
}{
{"d1.parquet", []struct {
ID int64
Name string
}{{1, "alice"}, {2, "bob"}}},
},
[]struct {
Name string
Rows []struct {
FilePath string
Pos int64
}
}{
{"pd1.parquet", []struct {
FilePath string
Pos int64
}{{"data/d1.parquet", 0}}},
{"pd2.parquet", []struct {
FilePath string
Pos int64
}{{"data/d1.parquet", 1}}},
},
nil,
)
handler := NewHandler(nil)
config := Config{
Operations: "rewrite_position_delete_files",
DeleteTargetFileSizeBytes: 64 * 1024 * 1024,
DeleteMinInputFiles: 2,
DeleteMaxFileGroupSizeBytes: 128 * 1024 * 1024,
DeleteMaxOutputFiles: 4,
}
tables, err := handler.scanTablesForMaintenance(context.Background(), client, config, "", "", "", 0)
if err != nil {
t.Fatalf("scanTablesForMaintenance: %v", err)
}
if len(tables) != 1 {
t.Fatalf("expected 1 table needing delete rewrite, got %d", len(tables))
}
}
func TestRewritePositionDeleteFilesSkipsSingleFile(t *testing.T) {
fs, client := startFakeFiler(t)
setup := tableSetup{BucketName: "tb", Namespace: "ns", TableName: "tbl"}
populateTableWithDeleteFiles(t, fs, setup,
[]struct {
Name string
Rows []struct {
ID int64
Name string
}
}{
{"d1.parquet", []struct {
ID int64
Name string
}{{1, "alice"}, {2, "bob"}}},
},
[]struct {
Name string
Rows []struct {
FilePath string
Pos int64
}
}{
{"pd1.parquet", []struct {
FilePath string
Pos int64
}{{"data/d1.parquet", 0}}},
},
nil,
)
handler := NewHandler(nil)
config := Config{
DeleteTargetFileSizeBytes: 64 * 1024 * 1024,
DeleteMinInputFiles: 2,
DeleteMaxFileGroupSizeBytes: 128 * 1024 * 1024,
DeleteMaxOutputFiles: 4,
MaxCommitRetries: 3,
}
result, _, err := handler.rewritePositionDeleteFiles(context.Background(), client, setup.BucketName, setup.tablePath(), config)
if err != nil {
t.Fatalf("rewritePositionDeleteFiles: %v", err)
}
if !strings.Contains(result, "no position delete files eligible") {
t.Fatalf("unexpected result: %q", result)
}
}
func TestRewritePositionDeleteFilesRespectsMinInputFiles(t *testing.T) {
fs, client := startFakeFiler(t)
setup := tableSetup{BucketName: "tb", Namespace: "ns", TableName: "tbl"}
populateTableWithDeleteFiles(t, fs, setup,
[]struct {
Name string
Rows []struct {
ID int64
Name string
}
}{
{"d1.parquet", []struct {
ID int64
Name string
}{{1, "alice"}, {2, "bob"}}},
},
[]struct {
Name string
Rows []struct {
FilePath string
Pos int64
}
}{
{"pd1.parquet", []struct {
FilePath string
Pos int64
}{{"data/d1.parquet", 0}}},
{"pd2.parquet", []struct {
FilePath string
Pos int64
}{{"data/d1.parquet", 1}}},
},
nil,
)
handler := NewHandler(nil)
config := Config{
DeleteTargetFileSizeBytes: 64 * 1024 * 1024,
DeleteMinInputFiles: 3,
DeleteMaxFileGroupSizeBytes: 128 * 1024 * 1024,
DeleteMaxOutputFiles: 4,
MaxCommitRetries: 3,
}
result, _, err := handler.rewritePositionDeleteFiles(context.Background(), client, setup.BucketName, setup.tablePath(), config)
if err != nil {
t.Fatalf("rewritePositionDeleteFiles: %v", err)
}
if !strings.Contains(result, "no position delete files eligible") {
t.Fatalf("unexpected result: %q", result)
}
}
func TestRewritePositionDeleteFilesPreservesUnsupportedMultiTargetDeletes(t *testing.T) {
fs, client := startFakeFiler(t)
setup := tableSetup{BucketName: "tb", Namespace: "ns", TableName: "tbl"}
populateTableWithDeleteFiles(t, fs, setup,
[]struct {
Name string
Rows []struct {
ID int64
Name string
}
}{
{"d1.parquet", []struct {
ID int64
Name string
}{{1, "alice"}, {2, "bob"}, {3, "charlie"}}},
{"d2.parquet", []struct {
ID int64
Name string
}{{4, "diana"}, {5, "eve"}}},
},
[]struct {
Name string
Rows []struct {
FilePath string
Pos int64
}
}{
{"pd1.parquet", []struct {
FilePath string
Pos int64
}{{"data/d1.parquet", 0}}},
{"pd2.parquet", []struct {
FilePath string
Pos int64
}{{"data/d1.parquet", 1}}},
{"pd3.parquet", []struct {
FilePath string
Pos int64
}{{"data/d1.parquet", 2}, {"data/d2.parquet", 0}}},
},
nil,
)
handler := NewHandler(nil)
config := Config{
DeleteTargetFileSizeBytes: 64 * 1024 * 1024,
DeleteMinInputFiles: 2,
DeleteMaxFileGroupSizeBytes: 128 * 1024 * 1024,
DeleteMaxOutputFiles: 4,
MaxCommitRetries: 3,
}
if _, _, err := handler.rewritePositionDeleteFiles(context.Background(), client, setup.BucketName, setup.tablePath(), config); err != nil {
t.Fatalf("rewritePositionDeleteFiles: %v", err)
}
posPaths, _ := loadLiveDeleteFilePaths(t, client, setup.BucketName, setup.tablePath())
if len(posPaths) != 2 {
t.Fatalf("expected rewritten file plus untouched multi-target file, got %v", posPaths)
}
if posPaths[0] != "data/pd3.parquet" && posPaths[1] != "data/pd3.parquet" {
t.Fatalf("expected multi-target delete file to be preserved, got %v", posPaths)
}
if !strings.HasPrefix(posPaths[0], "data/rewrite-delete-") && !strings.HasPrefix(posPaths[1], "data/rewrite-delete-") {
t.Fatalf("expected rewritten delete file to remain live, got %v", posPaths)
}
}
func TestRewritePositionDeleteFilesRebuildsMixedDeleteManifests(t *testing.T) {
fs, client := startFakeFiler(t)
setup := tableSetup{BucketName: "tb", Namespace: "ns", TableName: "tbl"}
populateTableWithDeleteFiles(t, fs, setup,
[]struct {
Name string
Rows []struct {
ID int64
Name string
}
}{
{"d1.parquet", []struct {
ID int64
Name string
}{{1, "alice"}, {2, "bob"}, {3, "charlie"}}},
},
[]struct {
Name string
Rows []struct {
FilePath string
Pos int64
}
}{
{"pd1.parquet", []struct {
FilePath string
Pos int64
}{{"data/d1.parquet", 0}}},
{"pd2.parquet", []struct {
FilePath string
Pos int64
}{{"data/d1.parquet", 1}}},
},
[]struct {
Name string
FieldIDs []int
Rows []struct {
ID int64
Name string
}
}{
{"eq1.parquet", []int{1}, []struct {
ID int64
Name string
}{{3, "charlie"}}},
},
)
rewriteDeleteManifestsAsMixed(t, fs, client, setup)
handler := NewHandler(nil)
config := Config{
DeleteTargetFileSizeBytes: 64 * 1024 * 1024,
DeleteMinInputFiles: 2,
DeleteMaxFileGroupSizeBytes: 128 * 1024 * 1024,
DeleteMaxOutputFiles: 4,
MaxCommitRetries: 3,
}
if _, _, err := handler.rewritePositionDeleteFiles(context.Background(), client, setup.BucketName, setup.tablePath(), config); err != nil {
t.Fatalf("rewritePositionDeleteFiles: %v", err)
}
posPaths, eqPaths := loadLiveDeleteFilePaths(t, client, setup.BucketName, setup.tablePath())
if len(posPaths) != 1 || !strings.HasPrefix(posPaths[0], "data/rewrite-delete-") {
t.Fatalf("expected only the rewritten position delete file to remain live, got %v", posPaths)
}
if len(eqPaths) != 1 || eqPaths[0] != "data/eq1.parquet" {
t.Fatalf("expected equality delete file to be preserved, got %v", eqPaths)
}
}

View File

@@ -48,7 +48,7 @@ func (h *Handler) Capability() *plugin_pb.JobTypeCapability {
MaxDetectionConcurrency: 1,
MaxExecutionConcurrency: 4,
DisplayName: "Iceberg Maintenance",
Description: "Compacts, expires snapshots, removes orphans, and rewrites manifests for Iceberg tables in S3 table buckets",
Description: "Compacts data, rewrites delete files, expires snapshots, removes orphans, and rewrites manifests for Iceberg tables in S3 table buckets",
Weight: 50,
}
}
@@ -57,7 +57,7 @@ func (h *Handler) Descriptor() *plugin_pb.JobTypeDescriptor {
return &plugin_pb.JobTypeDescriptor{
JobType: jobType,
DisplayName: "Iceberg Maintenance",
Description: "Automated maintenance for Iceberg tables: snapshot expiration, orphan removal, manifest rewriting",
Description: "Automated maintenance for Iceberg tables: data compaction, delete-file rewrite, snapshot expiration, orphan removal, and manifest rewriting",
Icon: "fas fa-snowflake",
DescriptorVersion: 1,
AdminConfigForm: &plugin_pb.ConfigForm{
@@ -159,7 +159,7 @@ func (h *Handler) Descriptor() *plugin_pb.JobTypeDescriptor {
{
SectionId: "compaction",
Title: "Data Compaction",
Description: "Controls for bin-packing small Parquet data files.",
Description: "Controls for bin-packing or sorting small Parquet data files.",
Fields: []*plugin_pb.ConfigField{
{
Name: "target_file_size_mb",
@@ -184,6 +184,69 @@ func (h *Handler) Descriptor() *plugin_pb.JobTypeDescriptor {
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_BOOL,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TOGGLE,
},
{
Name: "rewrite_strategy",
Label: "Rewrite Strategy",
Description: "binpack keeps the current row order; sort rewrites each compaction bin using sort_fields or the table sort order.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
Placeholder: "binpack or sort",
},
{
Name: "sort_fields",
Label: "Sort Fields",
Description: "Comma-separated field names for rewrite_strategy=sort. Blank uses the table sort order when present.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
Placeholder: "id, created_at",
},
{
Name: "sort_max_input_mb",
Label: "Sort Max Input (MB)",
Description: "Optional hard cap for the total bytes in a sorted compaction bin. Zero = no extra cap beyond binning.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}},
},
},
},
{
SectionId: "delete_rewrite",
Title: "Delete Rewrite",
Description: "Controls for rewriting small position-delete files into fewer larger files.",
Fields: []*plugin_pb.ConfigField{
{
Name: "delete_target_file_size_mb",
Label: "Delete Target File Size (MB)",
Description: "Target size for rewritten position-delete files.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}},
},
{
Name: "delete_min_input_files",
Label: "Delete Min Input Files",
Description: "Minimum number of position-delete files in a group before rewrite is triggered.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 2}},
},
{
Name: "delete_max_file_group_size_mb",
Label: "Delete Max Group Size (MB)",
Description: "Skip rewriting delete groups larger than this bound.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}},
},
{
Name: "delete_max_output_files",
Label: "Delete Max Output Files",
Description: "Maximum number of rewritten delete files a single group may produce.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_INT64,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_NUMBER,
MinValue: &plugin_pb.ConfigValue{Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 1}},
},
},
},
{
@@ -233,16 +296,28 @@ func (h *Handler) Descriptor() *plugin_pb.JobTypeDescriptor {
{
Name: "operations",
Label: "Operations",
Description: "Comma-separated list of operations to run: compact, expire_snapshots, remove_orphans, rewrite_manifests, or 'all'.",
Description: "Comma-separated list of operations to run: compact, rewrite_position_delete_files, expire_snapshots, remove_orphans, rewrite_manifests, or 'all'.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
},
{
Name: "where",
Label: "Where Filter",
Description: "Optional partition filter for compact, rewrite_position_delete_files, and rewrite_manifests. Supports field = literal, field IN (...), and AND.",
FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING,
Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT,
Placeholder: "region = 'us' AND dt IN ('2026-03-15')",
},
},
},
},
DefaultValues: map[string]*plugin_pb.ConfigValue{
"target_file_size_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultTargetFileSizeMB}},
"min_input_files": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMinInputFiles}},
"delete_target_file_size_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultDeleteTargetFileSizeMB}},
"delete_min_input_files": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultDeleteMinInputFiles}},
"delete_max_file_group_size_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultDeleteMaxGroupSizeMB}},
"delete_max_output_files": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultDeleteMaxOutputFiles}},
"min_manifests_to_rewrite": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMinManifestsToRewrite}},
"snapshot_retention_hours": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultSnapshotRetentionHours}},
"max_snapshots_to_keep": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMaxSnapshotsToKeep}},
@@ -250,6 +325,10 @@ func (h *Handler) Descriptor() *plugin_pb.JobTypeDescriptor {
"max_commit_retries": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMaxCommitRetries}},
"operations": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: defaultOperations}},
"apply_deletes": {Kind: &plugin_pb.ConfigValue_BoolValue{BoolValue: true}},
"rewrite_strategy": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "binpack"}},
"sort_fields": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}},
"sort_max_input_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}},
"where": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}},
},
},
AdminRuntimeDefaults: &plugin_pb.AdminRuntimeDefaults{
@@ -266,12 +345,20 @@ func (h *Handler) Descriptor() *plugin_pb.JobTypeDescriptor {
WorkerDefaultValues: map[string]*plugin_pb.ConfigValue{
"target_file_size_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultTargetFileSizeMB}},
"min_input_files": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMinInputFiles}},
"delete_target_file_size_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultDeleteTargetFileSizeMB}},
"delete_min_input_files": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultDeleteMinInputFiles}},
"delete_max_file_group_size_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultDeleteMaxGroupSizeMB}},
"delete_max_output_files": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultDeleteMaxOutputFiles}},
"snapshot_retention_hours": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultSnapshotRetentionHours}},
"max_snapshots_to_keep": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMaxSnapshotsToKeep}},
"orphan_older_than_hours": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultOrphanOlderThanHours}},
"max_commit_retries": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMaxCommitRetries}},
"operations": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: defaultOperations}},
"apply_deletes": {Kind: &plugin_pb.ConfigValue_BoolValue{BoolValue: true}},
"rewrite_strategy": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "binpack"}},
"sort_fields": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}},
"sort_max_input_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}},
"where": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}},
},
}
}
@@ -288,9 +375,13 @@ func (h *Handler) Detect(ctx context.Context, request *plugin_pb.RunDetectionReq
}
workerConfig := ParseConfig(request.GetWorkerConfigValues())
if _, err := parseOperations(workerConfig.Operations); err != nil {
ops, err := parseOperations(workerConfig.Operations)
if err != nil {
return fmt.Errorf("invalid operations config: %w", err)
}
if err := validateWhereOperations(workerConfig.Where, ops); err != nil {
return fmt.Errorf("invalid where config: %w", err)
}
// Detection interval is managed by the scheduler via AdminRuntimeDefaults.DetectionIntervalSeconds.
@@ -407,6 +498,9 @@ func (h *Handler) Execute(ctx context.Context, request *plugin_pb.ExecuteJobRequ
if opsErr != nil {
return fmt.Errorf("invalid operations config: %w", opsErr)
}
if err := validateWhereOperations(workerConfig.Where, ops); err != nil {
return fmt.Errorf("invalid where config: %w", err)
}
// Send initial progress
if err := sender.SendProgress(&plugin_pb.JobProgressUpdate{
@@ -437,8 +531,8 @@ func (h *Handler) Execute(ctx context.Context, request *plugin_pb.ExecuteJobRequ
completedOps := 0
allMetrics := make(map[string]int64)
// Execute operations in correct Iceberg maintenance order:
// expire_snapshots → remove_orphans → rewrite_manifests
// Execute operations in canonical maintenance order as defined by
// parseOperations.
for _, op := range ops {
select {
case <-ctx.Done():
@@ -478,6 +572,8 @@ func (h *Handler) Execute(ctx context.Context, request *plugin_pb.ExecuteJobRequ
Message: fmt.Sprintf("compacting bin %d of %d", binIdx+1, totalBins),
})
})
case "rewrite_position_delete_files":
opResult, opMetrics, opErr = h.rewritePositionDeleteFiles(ctx, filerClient, bucketName, tablePath, workerConfig)
case "expire_snapshots":
opResult, opMetrics, opErr = h.expireSnapshots(ctx, filerClient, bucketName, tablePath, workerConfig)
case "remove_orphans":

View File

@@ -42,15 +42,17 @@ func TestParseOperations(t *testing.T) {
expected []string
wantErr bool
}{
{"all", []string{"compact", "expire_snapshots", "remove_orphans", "rewrite_manifests"}, false},
{"", []string{"compact", "expire_snapshots", "remove_orphans", "rewrite_manifests"}, false},
{"all", []string{"compact", "rewrite_position_delete_files", "expire_snapshots", "remove_orphans", "rewrite_manifests"}, false},
{"", []string{"compact", "rewrite_position_delete_files", "expire_snapshots", "remove_orphans", "rewrite_manifests"}, false},
{"expire_snapshots", []string{"expire_snapshots"}, false},
{"compact", []string{"compact"}, false},
{"rewrite_position_delete_files", []string{"rewrite_position_delete_files"}, false},
{"rewrite_manifests,expire_snapshots", []string{"expire_snapshots", "rewrite_manifests"}, false},
{"compact,expire_snapshots", []string{"compact", "expire_snapshots"}, false},
{"remove_orphans, rewrite_manifests", []string{"remove_orphans", "rewrite_manifests"}, false},
{"expire_snapshots,remove_orphans,rewrite_manifests", []string{"expire_snapshots", "remove_orphans", "rewrite_manifests"}, false},
{"compact,expire_snapshots,remove_orphans,rewrite_manifests", []string{"compact", "expire_snapshots", "remove_orphans", "rewrite_manifests"}, false},
{"compact,rewrite_position_delete_files,rewrite_manifests", []string{"compact", "rewrite_position_delete_files", "rewrite_manifests"}, false},
{"unknown_op", nil, true},
{"expire_snapshots,bad_op", nil, true},
}
@@ -848,6 +850,35 @@ func TestParseConfigApplyDeletes(t *testing.T) {
}
}
func TestNormalizeDetectionConfigUsesSharedDefaults(t *testing.T) {
config := normalizeDetectionConfig(Config{})
if config.TargetFileSizeBytes != defaultTargetFileSizeMB*1024*1024 {
t.Fatalf("expected TargetFileSizeBytes default, got %d", config.TargetFileSizeBytes)
}
if config.DeleteTargetFileSizeBytes != defaultDeleteTargetFileSizeMB*1024*1024 {
t.Fatalf("expected DeleteTargetFileSizeBytes default, got %d", config.DeleteTargetFileSizeBytes)
}
if config.DeleteMinInputFiles != defaultDeleteMinInputFiles {
t.Fatalf("expected DeleteMinInputFiles default, got %d", config.DeleteMinInputFiles)
}
if config.DeleteMaxFileGroupSizeBytes != defaultDeleteMaxGroupSizeMB*1024*1024 {
t.Fatalf("expected DeleteMaxFileGroupSizeBytes default, got %d", config.DeleteMaxFileGroupSizeBytes)
}
if config.DeleteMaxOutputFiles != defaultDeleteMaxOutputFiles {
t.Fatalf("expected DeleteMaxOutputFiles default, got %d", config.DeleteMaxOutputFiles)
}
if config.OrphanOlderThanHours != defaultOrphanOlderThanHours {
t.Fatalf("expected OrphanOlderThanHours default, got %d", config.OrphanOlderThanHours)
}
if config.SnapshotRetentionHours != defaultSnapshotRetentionHours {
t.Fatalf("expected SnapshotRetentionHours default, got %d", config.SnapshotRetentionHours)
}
if config.MaxSnapshotsToKeep != defaultMaxSnapshotsToKeep {
t.Fatalf("expected MaxSnapshotsToKeep default, got %d", config.MaxSnapshotsToKeep)
}
}
func TestCollectPositionDeletes(t *testing.T) {
fs, client := startFakeFiler(t)

View File

@@ -323,6 +323,10 @@ func (h *Handler) rewriteManifests(
if err != nil {
return "", nil, fmt.Errorf("load metadata: %w", err)
}
predicate, err := parsePartitionPredicate(config.Where, meta)
if err != nil {
return "", nil, err
}
currentSnap := meta.CurrentSnapshot()
if currentSnap == nil || currentSnap.ManifestList == "" {
@@ -349,10 +353,6 @@ func (h *Handler) rewriteManifests(
}
}
if int64(len(dataManifests)) < config.MinManifestsToRewrite {
return fmt.Sprintf("only %d data manifests, below threshold of %d", len(dataManifests), config.MinManifestsToRewrite), nil, nil
}
// Collect all entries from data manifests, grouped by partition spec ID
// so we write one merged manifest per spec (required for spec-evolved tables).
type specEntries struct {
@@ -363,10 +363,9 @@ func (h *Handler) rewriteManifests(
specMap := make(map[int32]*specEntries)
// Build a lookup from spec ID to PartitionSpec
specByID := make(map[int]iceberg.PartitionSpec)
for _, ps := range meta.PartitionSpecs() {
specByID[ps.ID()] = ps
}
specByID := specByID(meta)
var carriedDataManifests []iceberg.ManifestFile
var manifestsRewritten int64
for _, mf := range dataManifests {
manifestData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, mf.FilePath())
@@ -378,6 +377,28 @@ func (h *Handler) rewriteManifests(
return "", nil, fmt.Errorf("parse manifest %s: %w", mf.FilePath(), err)
}
if predicate != nil {
spec, found := specByID[int(mf.PartitionSpecID())]
if !found {
return "", nil, fmt.Errorf("partition spec %d not found in table metadata", mf.PartitionSpecID())
}
allMatch := len(entries) > 0
for _, entry := range entries {
match, err := predicate.Matches(spec, entry.DataFile().Partition())
if err != nil {
return "", nil, err
}
if !match {
allMatch = false
break
}
}
if !allMatch {
carriedDataManifests = append(carriedDataManifests, mf)
continue
}
}
sid := mf.PartitionSpecID()
se, ok := specMap[sid]
if !ok {
@@ -389,6 +410,11 @@ func (h *Handler) rewriteManifests(
specMap[sid] = se
}
se.entries = append(se.entries, entries...)
manifestsRewritten++
}
if manifestsRewritten < config.MinManifestsToRewrite {
return fmt.Sprintf("only %d data manifests, below threshold of %d", manifestsRewritten, config.MinManifestsToRewrite), nil, nil
}
if len(specMap) == 0 {
@@ -425,6 +451,7 @@ func (h *Handler) rewriteManifests(
// Write one merged manifest per partition spec
var newManifests []iceberg.ManifestFile
newManifests = append(newManifests, carriedDataManifests...)
totalEntries := 0
for _, se := range specMap {
totalEntries += len(se.entries)
@@ -514,11 +541,11 @@ func (h *Handler) rewriteManifests(
committed = true
metrics := map[string]int64{
MetricManifestsRewritten: int64(len(dataManifests)),
MetricManifestsRewritten: manifestsRewritten,
MetricEntriesTotal: int64(totalEntries),
MetricDurationMs: time.Since(start).Milliseconds(),
}
return fmt.Sprintf("rewrote %d manifests into %d (%d entries)", len(dataManifests), len(specMap), totalEntries), metrics, nil
return fmt.Sprintf("rewrote %d manifests into %d (%d entries)", manifestsRewritten, len(specMap), totalEntries), metrics, nil
}
// ---------------------------------------------------------------------------

View File

@@ -177,7 +177,7 @@ func buildPlanningIndexFromManifests(
}
if operationRequested(ops, "compact") {
eligible, err := hasEligibleCompaction(ctx, filerClient, bucketName, tablePath, manifests, config)
eligible, err := hasEligibleCompaction(ctx, filerClient, bucketName, tablePath, manifests, config, meta, nil)
if err != nil {
return nil, err
}

View File

@@ -0,0 +1,311 @@
package iceberg
import (
"fmt"
"regexp"
"strconv"
"strings"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/table"
)
var (
whereEqualsPattern = regexp.MustCompile(`^([A-Za-z_][A-Za-z0-9_]*)\s*=\s*(.+)$`)
whereInPattern = regexp.MustCompile(`^(?i)([A-Za-z_][A-Za-z0-9_]*)\s+IN\s*\((.*)\)$`)
)
type whereClause struct {
Field string
Literals []string
}
type partitionPredicate struct {
Clauses []whereClause
}
func validateWhereOperations(where string, ops []string) error {
if strings.TrimSpace(where) == "" {
return nil
}
for _, op := range ops {
switch op {
case "compact", "rewrite_manifests", "rewrite_position_delete_files":
continue
default:
return fmt.Errorf("where filter is only supported for compact, rewrite_position_delete_files, and rewrite_manifests")
}
}
return nil
}
func parsePartitionPredicate(where string, meta table.Metadata) (*partitionPredicate, error) {
where = strings.TrimSpace(where)
if where == "" {
return nil, nil
}
if meta == nil {
return nil, fmt.Errorf("where filter requires table metadata")
}
specs := meta.PartitionSpecs()
if len(specs) == 0 || meta.PartitionSpec().IsUnpartitioned() {
return nil, fmt.Errorf("where filter is not supported for unpartitioned tables")
}
rawClauses := splitWhereConjunction(where)
clauses := make([]whereClause, 0, len(rawClauses))
for _, raw := range rawClauses {
clause, err := parseWhereClause(raw)
if err != nil {
return nil, err
}
clauses = append(clauses, clause)
}
// Validate against the current partition spec only. Historical specs may
// lack fields added during schema evolution; per-entry matching in Matches()
// handles those gracefully.
currentSpec := meta.PartitionSpec()
for _, clause := range clauses {
if !specHasFieldByName(currentSpec, clause.Field) {
return nil, fmt.Errorf("where field %q is not present in current partition spec %d", clause.Field, currentSpec.ID())
}
}
return &partitionPredicate{Clauses: clauses}, nil
}
func splitWhereConjunction(where string) []string {
// Quote-aware split: only split on AND that appears outside quotes.
var parts []string
var current strings.Builder
var quote rune
runes := []rune(where)
for i := 0; i < len(runes); i++ {
r := runes[i]
if quote != 0 {
current.WriteRune(r)
if r == quote {
quote = 0
}
continue
}
if r == '\'' || r == '"' {
quote = r
current.WriteRune(r)
continue
}
// Check for case-insensitive AND surrounded by whitespace.
if (r == 'A' || r == 'a') && i+3 < len(runes) {
candidate := string(runes[i : i+3])
if strings.EqualFold(candidate, "AND") {
before := i > 0 && isWhitespace(runes[i-1])
after := i+3 < len(runes) && isWhitespace(runes[i+3])
if before && after {
part := strings.TrimSpace(current.String())
if part != "" {
parts = append(parts, part)
}
current.Reset()
i += 3 // skip "AND" + the after-space will be consumed next iteration
continue
}
}
}
current.WriteRune(r)
}
if part := strings.TrimSpace(current.String()); part != "" {
parts = append(parts, part)
}
return parts
}
func isWhitespace(r rune) bool {
return r == ' ' || r == '\t' || r == '\n' || r == '\r'
}
func parseWhereClause(raw string) (whereClause, error) {
raw = strings.TrimSpace(raw)
if raw == "" {
return whereClause{}, fmt.Errorf("empty where clause")
}
if matches := whereInPattern.FindStringSubmatch(raw); matches != nil {
literals, err := splitLiteralList(matches[2])
if err != nil {
return whereClause{}, err
}
if len(literals) == 0 {
return whereClause{}, fmt.Errorf("empty IN list in where clause %q", raw)
}
return whereClause{Field: matches[1], Literals: literals}, nil
}
if matches := whereEqualsPattern.FindStringSubmatch(raw); matches != nil {
return whereClause{Field: matches[1], Literals: []string{strings.TrimSpace(matches[2])}}, nil
}
return whereClause{}, fmt.Errorf("unsupported where clause %q", raw)
}
func splitLiteralList(raw string) ([]string, error) {
raw = strings.TrimSpace(raw)
if raw == "" {
return nil, nil
}
var (
literals []string
current strings.Builder
quote rune
)
for _, r := range raw {
switch {
case quote != 0:
current.WriteRune(r)
if r == quote {
quote = 0
}
case r == '\'' || r == '"':
quote = r
current.WriteRune(r)
case r == ',':
literal := strings.TrimSpace(current.String())
if literal != "" {
literals = append(literals, literal)
}
current.Reset()
default:
current.WriteRune(r)
}
}
if quote != 0 {
return nil, fmt.Errorf("unterminated quoted literal in IN list")
}
if literal := strings.TrimSpace(current.String()); literal != "" {
literals = append(literals, literal)
}
return literals, nil
}
func specHasFieldByName(spec iceberg.PartitionSpec, fieldName string) bool {
for field := range spec.Fields() {
if field.Name == fieldName {
return true
}
}
return false
}
func specByID(meta table.Metadata) map[int]iceberg.PartitionSpec {
result := make(map[int]iceberg.PartitionSpec)
if meta == nil {
return result
}
for _, spec := range meta.PartitionSpecs() {
result[spec.ID()] = spec
}
return result
}
func (p *partitionPredicate) Matches(spec iceberg.PartitionSpec, partition map[int]any) (bool, error) {
if p == nil {
return true, nil
}
valuesByName := make(map[string]any)
for field := range spec.Fields() {
if value, ok := partition[field.FieldID]; ok {
valuesByName[field.Name] = value
}
}
for _, clause := range p.Clauses {
actual, ok := valuesByName[clause.Field]
if !ok {
// Field not present in this spec (e.g. older spec before schema
// evolution). Skip this entry rather than erroring.
return false, nil
}
matched := false
for _, literal := range clause.Literals {
ok, err := literalMatchesActual(literal, actual)
if err != nil {
return false, fmt.Errorf("where field %q: %w", clause.Field, err)
}
if ok {
matched = true
break
}
}
if !matched {
return false, nil
}
}
return true, nil
}
func literalMatchesActual(raw string, actual any) (bool, error) {
raw = strings.TrimSpace(raw)
if raw == "" {
return false, fmt.Errorf("empty literal")
}
switch v := actual.(type) {
case string:
value, err := unquoteLiteral(raw)
if err != nil {
return false, err
}
return v == value, nil
case bool:
value, err := strconv.ParseBool(strings.ToLower(strings.TrimSpace(raw)))
if err != nil {
return false, fmt.Errorf("parse bool literal %q: %w", raw, err)
}
return v == value, nil
case int:
value, err := strconv.ParseInt(raw, 10, 64)
if err != nil {
return false, fmt.Errorf("parse int literal %q: %w", raw, err)
}
return int64(v) == value, nil
case int32:
value, err := strconv.ParseInt(raw, 10, 32)
if err != nil {
return false, fmt.Errorf("parse int32 literal %q: %w", raw, err)
}
return v == int32(value), nil
case int64:
value, err := strconv.ParseInt(raw, 10, 64)
if err != nil {
return false, fmt.Errorf("parse int64 literal %q: %w", raw, err)
}
return v == value, nil
case float32:
value, err := strconv.ParseFloat(raw, 32)
if err != nil {
return false, fmt.Errorf("parse float32 literal %q: %w", raw, err)
}
return v == float32(value), nil
case float64:
value, err := strconv.ParseFloat(raw, 64)
if err != nil {
return false, fmt.Errorf("parse float64 literal %q: %w", raw, err)
}
return v == value, nil
default:
value, err := unquoteLiteral(raw)
if err != nil {
return false, err
}
return fmt.Sprint(actual) == value, nil
}
}
func unquoteLiteral(raw string) (string, error) {
raw = strings.TrimSpace(raw)
if len(raw) >= 2 {
if (raw[0] == '\'' && raw[len(raw)-1] == '\'') || (raw[0] == '"' && raw[len(raw)-1] == '"') {
return raw[1 : len(raw)-1], nil
}
}
return raw, nil
}

View File

@@ -0,0 +1,287 @@
package iceberg
import (
"bytes"
"context"
"encoding/json"
"fmt"
"path"
"strings"
"testing"
"time"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/table"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3tables"
)
type partitionedTestFile struct {
Name string
Partition map[int]any
Rows []struct {
ID int64
Name string
}
}
func populatePartitionedDataTable(
t *testing.T,
fs *fakeFilerServer,
setup tableSetup,
partitionSpec iceberg.PartitionSpec,
manifestGroups [][]partitionedTestFile,
) table.Metadata {
t.Helper()
schema := newTestSchema()
meta, err := table.NewMetadata(schema, &partitionSpec, table.UnsortedSortOrder, "s3://"+setup.BucketName+"/"+setup.tablePath(), nil)
if err != nil {
t.Fatalf("create metadata: %v", err)
}
bucketsPath := s3tables.TablesPath
bucketPath := path.Join(bucketsPath, setup.BucketName)
nsPath := path.Join(bucketPath, setup.Namespace)
tablePath := path.Join(nsPath, setup.TableName)
metaDir := path.Join(tablePath, "metadata")
dataDir := path.Join(tablePath, "data")
version := meta.Version()
var manifestFiles []iceberg.ManifestFile
for idx, group := range manifestGroups {
entries := make([]iceberg.ManifestEntry, 0, len(group))
for _, file := range group {
data := writeTestParquetFile(t, fs, dataDir, file.Name, file.Rows)
dfBuilder, err := iceberg.NewDataFileBuilder(
partitionSpec,
iceberg.EntryContentData,
path.Join("data", file.Name),
iceberg.ParquetFile,
file.Partition,
nil, nil,
int64(len(file.Rows)),
int64(len(data)),
)
if err != nil {
t.Fatalf("build data file %s: %v", file.Name, err)
}
snapID := int64(1)
entries = append(entries, iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &snapID, nil, nil, dfBuilder.Build()))
}
manifestName := fmt.Sprintf("where-manifest-%d.avro", idx+1)
var manifestBuf bytes.Buffer
mf, err := iceberg.WriteManifest(path.Join("metadata", manifestName), &manifestBuf, version, partitionSpec, schema, 1, entries)
if err != nil {
t.Fatalf("write manifest %d: %v", idx+1, err)
}
fs.putEntry(metaDir, manifestName, &filer_pb.Entry{
Name: manifestName,
Content: manifestBuf.Bytes(),
Attributes: &filer_pb.FuseAttributes{Mtime: time.Now().Unix(), FileSize: uint64(manifestBuf.Len())},
})
manifestFiles = append(manifestFiles, mf)
}
var manifestListBuf bytes.Buffer
seqNum := int64(1)
if err := iceberg.WriteManifestList(version, &manifestListBuf, 1, nil, &seqNum, 0, manifestFiles); err != nil {
t.Fatalf("write manifest list: %v", err)
}
fs.putEntry(metaDir, "snap-1.avro", &filer_pb.Entry{
Name: "snap-1.avro",
Content: manifestListBuf.Bytes(),
Attributes: &filer_pb.FuseAttributes{Mtime: time.Now().Unix(), FileSize: uint64(manifestListBuf.Len())},
})
builder, err := table.MetadataBuilderFromBase(meta, "s3://"+setup.BucketName+"/"+setup.tablePath())
if err != nil {
t.Fatalf("metadata builder: %v", err)
}
snapshot := table.Snapshot{SnapshotID: 1, TimestampMs: time.Now().UnixMilli(), ManifestList: "metadata/snap-1.avro", SequenceNumber: 1}
if err := builder.AddSnapshot(&snapshot); err != nil {
t.Fatalf("add snapshot: %v", err)
}
if err := builder.SetSnapshotRef(table.MainBranch, 1, table.BranchRef); err != nil {
t.Fatalf("set snapshot ref: %v", err)
}
meta, err = builder.Build()
if err != nil {
t.Fatalf("build metadata: %v", err)
}
fullMetadataJSON, _ := json.Marshal(meta)
internalMeta := map[string]interface{}{
"metadataVersion": 1,
"metadataLocation": "metadata/v1.metadata.json",
"metadata": map[string]interface{}{"fullMetadata": json.RawMessage(fullMetadataJSON)},
}
xattr, _ := json.Marshal(internalMeta)
fs.putEntry(bucketsPath, setup.BucketName, &filer_pb.Entry{
Name: setup.BucketName,
IsDirectory: true,
Extended: map[string][]byte{s3tables.ExtendedKeyTableBucket: []byte("true")},
})
fs.putEntry(bucketPath, setup.Namespace, &filer_pb.Entry{Name: setup.Namespace, IsDirectory: true})
fs.putEntry(nsPath, setup.TableName, &filer_pb.Entry{
Name: setup.TableName,
IsDirectory: true,
Extended: map[string][]byte{
s3tables.ExtendedKeyMetadata: xattr,
s3tables.ExtendedKeyMetadataVersion: metadataVersionXattr(1),
},
})
return meta
}
func TestValidateWhereOperations(t *testing.T) {
if err := validateWhereOperations("name = 'us'", []string{"compact", "rewrite_manifests"}); err != nil {
t.Fatalf("unexpected validation error: %v", err)
}
if err := validateWhereOperations("name = 'us'", []string{"expire_snapshots"}); err == nil {
t.Fatal("expected where validation to reject expire_snapshots")
}
}
func TestSplitWhereConjunctionQuoteAware(t *testing.T) {
cases := []struct {
input string
expected []string
}{
{"a = 1 AND b = 2", []string{"a = 1", "b = 2"}},
{"a = 'research AND dev'", []string{"a = 'research AND dev'"}},
{"a IN ('sales AND marketing', 'eng') AND b = 2", []string{"a IN ('sales AND marketing', 'eng')", "b = 2"}},
{"a = 1 and b = 2", []string{"a = 1", "b = 2"}},
{"a = 'x' AND b = \"y AND z\"", []string{"a = 'x'", "b = \"y AND z\""}},
}
for _, tc := range cases {
got := splitWhereConjunction(tc.input)
if len(got) != len(tc.expected) {
t.Errorf("splitWhereConjunction(%q) = %v, want %v", tc.input, got, tc.expected)
continue
}
for i := range got {
if got[i] != tc.expected[i] {
t.Errorf("splitWhereConjunction(%q)[%d] = %q, want %q", tc.input, i, got[i], tc.expected[i])
}
}
}
}
func TestPartitionPredicateMatchesUsesPartitionFieldIDs(t *testing.T) {
spec := iceberg.NewPartitionSpec(iceberg.PartitionField{
SourceID: 2,
FieldID: 1000,
Name: "name",
Transform: iceberg.IdentityTransform{},
})
predicate := &partitionPredicate{Clauses: []whereClause{{Field: "name", Literals: []string{"'us'"}}}}
match, err := predicate.Matches(spec, map[int]any{2: "us"})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if match {
t.Fatal("expected source-column key to not match partition predicate")
}
}
func TestCompactDataFilesWhereFilter(t *testing.T) {
fs, client := startFakeFiler(t)
partitionSpec := iceberg.NewPartitionSpec(iceberg.PartitionField{
SourceID: 2,
FieldID: 1000,
Name: "name",
Transform: iceberg.IdentityTransform{},
})
setup := tableSetup{BucketName: "tb", Namespace: "ns", TableName: "tbl"}
populatePartitionedDataTable(t, fs, setup, partitionSpec, [][]partitionedTestFile{
{
{Name: "us-1.parquet", Partition: map[int]any{1000: "us"}, Rows: []struct {
ID int64
Name string
}{{1, "us"}}},
},
{
{Name: "us-2.parquet", Partition: map[int]any{1000: "us"}, Rows: []struct {
ID int64
Name string
}{{2, "us"}}},
},
{
{Name: "eu-1.parquet", Partition: map[int]any{1000: "eu"}, Rows: []struct {
ID int64
Name string
}{{3, "eu"}}},
{Name: "eu-2.parquet", Partition: map[int]any{1000: "eu"}, Rows: []struct {
ID int64
Name string
}{{4, "eu"}}},
},
})
handler := NewHandler(nil)
config := Config{
TargetFileSizeBytes: 256 * 1024 * 1024,
MinInputFiles: 2,
MaxCommitRetries: 3,
Where: "name = 'us'",
}
result, _, err := handler.compactDataFiles(context.Background(), client, setup.BucketName, setup.tablePath(), config, nil)
if err != nil {
t.Fatalf("compactDataFiles: %v", err)
}
if !strings.Contains(result, "compacted 2 files into 1") {
t.Fatalf("unexpected result: %q", result)
}
meta, _, err := loadCurrentMetadata(context.Background(), client, setup.BucketName, setup.tablePath())
if err != nil {
t.Fatalf("loadCurrentMetadata: %v", err)
}
manifests, err := loadCurrentManifests(context.Background(), client, setup.BucketName, setup.tablePath(), meta)
if err != nil {
t.Fatalf("loadCurrentManifests: %v", err)
}
var liveDataPaths []string
for _, mf := range manifests {
if mf.ManifestContent() != iceberg.ManifestContentData {
continue
}
manifestData, err := loadFileByIcebergPath(context.Background(), client, setup.BucketName, setup.tablePath(), mf.FilePath())
if err != nil {
t.Fatalf("load data manifest: %v", err)
}
entries, err := iceberg.ReadManifest(mf, bytes.NewReader(manifestData), true)
if err != nil {
t.Fatalf("read data manifest: %v", err)
}
for _, entry := range entries {
liveDataPaths = append(liveDataPaths, entry.DataFile().FilePath())
}
}
if len(liveDataPaths) != 3 {
t.Fatalf("expected 3 live data files after filtered compaction, got %v", liveDataPaths)
}
var compactedCount int
for _, p := range liveDataPaths {
switch {
case strings.HasPrefix(p, "data/compact-"):
compactedCount++
case p == "data/eu-1.parquet", p == "data/eu-2.parquet":
default:
t.Fatalf("unexpected live data file %q", p)
}
}
if compactedCount != 1 {
t.Fatalf("expected exactly one compacted file, got %d in %v", compactedCount, liveDataPaths)
}
}