diff --git a/weed/plugin/worker/iceberg/compact.go b/weed/plugin/worker/iceberg/compact.go index a07bce9cc..acc4adc51 100644 --- a/weed/plugin/worker/iceberg/compact.go +++ b/weed/plugin/worker/iceberg/compact.go @@ -58,6 +58,11 @@ func (h *Handler) compactDataFiles( return "no current snapshot", nil, nil } + rewritePlan, err := resolveCompactionRewritePlan(config, meta) + if err != nil { + return "", nil, fmt.Errorf("resolve rewrite strategy: %w", err) + } + // Read manifest list manifestListData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, currentSnap.ManifestList) if err != nil { @@ -182,9 +187,12 @@ func (h *Handler) compactDataFiles( } // Build compaction bins: group small data files by partition. - bins := buildCompactionBins(candidateEntries, config.TargetFileSizeBytes, minInputFiles) + targetSize := compactionTargetSizeForPlan(config, rewritePlan) + bins := buildCompactionBins(candidateEntries, targetSize, minInputFiles) + initialBinCount := len(bins) + bins = filterCompactionBinsByPlan(bins, config, rewritePlan) if len(bins) == 0 { - return "no files eligible for compaction", nil, nil + return compactionNoEligibleMessage(config, rewritePlan, initialBinCount), nil, nil } // Build a lookup from spec ID to PartitionSpec for per-bin manifest writing. @@ -256,7 +264,13 @@ func (h *Handler) compactDataFiles( mergedFileName := fmt.Sprintf("compact-%d-%d-%s-%d.parquet", snapshotID, newSnapID, artifactSuffix, binIdx) mergedFilePath := path.Join("data", mergedFileName) - mergedData, recordCount, err := mergeParquetFiles(ctx, filerClient, bucketName, tablePath, bin.Entries, positionDeletes, eqDeleteGroups, schema) + var mergedData []byte + var recordCount int64 + if rewritePlan != nil && rewritePlan.strategy == "sort" { + mergedData, recordCount, err = mergeParquetFilesSorted(ctx, filerClient, bucketName, tablePath, bin.Entries, positionDeletes, eqDeleteGroups, schema, rewritePlan) + } else { + mergedData, recordCount, err = mergeParquetFiles(ctx, filerClient, bucketName, tablePath, bin.Entries, positionDeletes, eqDeleteGroups, schema) + } if err != nil { glog.Warningf("iceberg compact: failed to merge bin %d (%d files): %v", binIdx, len(bin.Entries), err) goto binDone @@ -473,10 +487,11 @@ func (h *Handler) compactDataFiles( Summary: &table.Summary{ Operation: table.OpReplace, Properties: map[string]string{ - "maintenance": "compact_data_files", - "merged-files": fmt.Sprintf("%d", totalMerged), - "new-files": fmt.Sprintf("%d", len(newManifestEntries)), - "compaction-bins": fmt.Sprintf("%d", len(bins)), + "maintenance": "compact_data_files", + "merged-files": fmt.Sprintf("%d", totalMerged), + "new-files": fmt.Sprintf("%d", len(newManifestEntries)), + "compaction-bins": fmt.Sprintf("%d", len(bins)), + "rewrite-strategy": rewritePlan.strategy, }, }, SchemaID: func() *int { @@ -484,6 +499,9 @@ func (h *Handler) compactDataFiles( return &id }(), } + if rewritePlan != nil && rewritePlan.strategy == "sort" { + newSnapshot.Summary.Properties["sort-fields"] = rewritePlan.summaryLabel() + } if err := builder.AddSnapshot(newSnapshot); err != nil { return err } @@ -500,7 +518,11 @@ func (h *Handler) compactDataFiles( MetricBins: int64(len(bins)), MetricDurationMs: time.Since(start).Milliseconds(), } - return fmt.Sprintf("compacted %d files into %d (across %d bins)", totalMerged, len(newManifestEntries), len(bins)), metrics, nil + result := fmt.Sprintf("compacted %d files into %d (across %d bins)", totalMerged, len(newManifestEntries), len(bins)) + if rewritePlan != nil && rewritePlan.strategy == "sort" { + result = fmt.Sprintf("%s using %s", result, rewritePlan.summaryLabel()) + } + return result, metrics, nil } // buildCompactionBins groups small data files by partition for bin-packing. @@ -922,109 +944,28 @@ func mergeParquetFiles( return nil, 0, fmt.Errorf("no parquet schema found in %s", entries[0].DataFile().FilePath()) } - // Resolve equality delete column indices for each group. - type resolvedEqGroup struct { - colIndices []int - keys map[string]struct{} - } - var resolvedEqGroups []resolvedEqGroup - if len(eqDeleteGroups) > 0 && icebergSchema != nil { - for _, g := range eqDeleteGroups { - indices, resolveErr := resolveEqualityColIndices(parquetSchema, g.FieldIDs, icebergSchema) - if resolveErr != nil { - firstReader.Close() - return nil, 0, fmt.Errorf("resolve equality columns: %w", resolveErr) - } - resolvedEqGroups = append(resolvedEqGroups, resolvedEqGroup{colIndices: indices, keys: g.Keys}) - } + resolvedEqGroups, err := resolveEqualityDeleteGroupsForSchema(parquetSchema, eqDeleteGroups, icebergSchema) + if err != nil { + firstReader.Close() + return nil, 0, fmt.Errorf("resolve equality columns: %w", err) } var outputBuf bytes.Buffer writer := parquet.NewWriter(&outputBuf, parquetSchema) - var totalRows int64 - rows := make([]parquet.Row, 256) - hasEqDeletes := len(resolvedEqGroups) > 0 - - // drainReader streams rows from reader into writer, filtering out deleted - // rows. source is the data file path (used for error messages and - // position delete lookups). - drainReader := func(reader *parquet.Reader, source string) error { - defer reader.Close() - - // Normalize source path so it matches the normalized keys in positionDeletes. - normalizedSource := normalizeIcebergPath(source, bucketName, tablePath) - posDeletes := positionDeletes[normalizedSource] - posDeleteIdx := 0 - var absolutePos int64 - - for { - select { - case <-ctx.Done(): - return ctx.Err() - default: + drainReader := func(reader *parquet.Reader, source string) (int64, error) { + return visitFilteredParquetRows(ctx, reader, source, bucketName, tablePath, positionDeletes, resolvedEqGroups, func(filtered []parquet.Row) error { + if _, err := writer.WriteRows(filtered); err != nil { + return fmt.Errorf("write rows from %s: %w", source, err) } - n, readErr := reader.ReadRows(rows) - if n > 0 { - // Filter rows if we have any deletes - if len(posDeletes) > 0 || hasEqDeletes { - writeIdx := 0 - for i := 0; i < n; i++ { - rowPos := absolutePos + int64(i) - - // Check position deletes (sorted, so advance index) - if len(posDeletes) > 0 { - for posDeleteIdx < len(posDeletes) && posDeletes[posDeleteIdx] < rowPos { - posDeleteIdx++ - } - if posDeleteIdx < len(posDeletes) && posDeletes[posDeleteIdx] == rowPos { - posDeleteIdx++ - continue // skip this row - } - } - - // Check equality deletes — each group independently - deleted := false - for _, g := range resolvedEqGroups { - key := buildEqualityKey(rows[i], g.colIndices) - if _, ok := g.keys[key]; ok { - deleted = true - break - } - } - if deleted { - continue // skip this row - } - - rows[writeIdx] = rows[i] - writeIdx++ - } - absolutePos += int64(n) - if writeIdx > 0 { - if _, writeErr := writer.WriteRows(rows[:writeIdx]); writeErr != nil { - return fmt.Errorf("write rows from %s: %w", source, writeErr) - } - totalRows += int64(writeIdx) - } - } else { - if _, writeErr := writer.WriteRows(rows[:n]); writeErr != nil { - return fmt.Errorf("write rows from %s: %w", source, writeErr) - } - totalRows += int64(n) - } - } - if readErr != nil { - if readErr == io.EOF { - return nil - } - return fmt.Errorf("read rows from %s: %w", source, readErr) - } - } + return nil + }) } // Drain the first file. firstSource := entries[0].DataFile().FilePath() - if err := drainReader(firstReader, firstSource); err != nil { + totalRows, err := drainReader(firstReader, firstSource) + if err != nil { writer.Close() return nil, 0, err } @@ -1052,10 +993,12 @@ func mergeParquetFiles( return nil, 0, fmt.Errorf("schema mismatch in %s: cannot merge files with different schemas", entry.DataFile().FilePath()) } - if err := drainReader(reader, entry.DataFile().FilePath()); err != nil { + rowsWritten, err := drainReader(reader, entry.DataFile().FilePath()) + if err != nil { writer.Close() return nil, 0, err } + totalRows += rowsWritten // data goes out of scope here, eligible for GC before next iteration. } @@ -1066,6 +1009,202 @@ func mergeParquetFiles( return outputBuf.Bytes(), totalRows, nil } +type resolvedEqDeleteGroup struct { + colIndices []int + keys map[string]struct{} +} + +func resolveEqualityDeleteGroupsForSchema( + parquetSchema *parquet.Schema, + eqDeleteGroups []equalityDeleteGroup, + icebergSchema *iceberg.Schema, +) ([]resolvedEqDeleteGroup, error) { + if len(eqDeleteGroups) == 0 || icebergSchema == nil { + return nil, nil + } + + resolved := make([]resolvedEqDeleteGroup, 0, len(eqDeleteGroups)) + for _, g := range eqDeleteGroups { + indices, err := resolveEqualityColIndices(parquetSchema, g.FieldIDs, icebergSchema) + if err != nil { + return nil, err + } + resolved = append(resolved, resolvedEqDeleteGroup{colIndices: indices, keys: g.Keys}) + } + return resolved, nil +} + +func visitFilteredParquetRows( + ctx context.Context, + reader *parquet.Reader, + source, bucketName, tablePath string, + positionDeletes map[string][]int64, + resolvedEqGroups []resolvedEqDeleteGroup, + onRows func([]parquet.Row) error, +) (int64, error) { + defer reader.Close() + + rows := make([]parquet.Row, 256) + filteredRows := make([]parquet.Row, 0, len(rows)) + normalizedSource := normalizeIcebergPath(source, bucketName, tablePath) + posDeletes := positionDeletes[normalizedSource] + posDeleteIdx := 0 + var absolutePos int64 + var totalRows int64 + + for { + select { + case <-ctx.Done(): + return totalRows, ctx.Err() + default: + } + + n, readErr := reader.ReadRows(rows) + if n > 0 { + filteredRows = filteredRows[:0] + for i := 0; i < n; i++ { + rowPos := absolutePos + int64(i) + for posDeleteIdx < len(posDeletes) && posDeletes[posDeleteIdx] < rowPos { + posDeleteIdx++ + } + if posDeleteIdx < len(posDeletes) && posDeletes[posDeleteIdx] == rowPos { + posDeleteIdx++ + continue + } + + deleted := false + for _, g := range resolvedEqGroups { + key := buildEqualityKey(rows[i], g.colIndices) + if _, ok := g.keys[key]; ok { + deleted = true + break + } + } + if deleted { + continue + } + + filteredRows = append(filteredRows, rows[i]) + } + absolutePos += int64(n) + if len(filteredRows) > 0 { + if err := onRows(filteredRows); err != nil { + return totalRows, err + } + totalRows += int64(len(filteredRows)) + } + } + + if readErr != nil { + if readErr == io.EOF { + return totalRows, nil + } + return totalRows, fmt.Errorf("read rows from %s: %w", source, readErr) + } + } +} + +func mergeParquetFilesSorted( + ctx context.Context, + filerClient filer_pb.SeaweedFilerClient, + bucketName, tablePath string, + entries []iceberg.ManifestEntry, + positionDeletes map[string][]int64, + eqDeleteGroups []equalityDeleteGroup, + icebergSchema *iceberg.Schema, + rewritePlan *compactionRewritePlan, +) ([]byte, int64, error) { + if len(entries) == 0 { + return nil, 0, fmt.Errorf("no entries to merge") + } + if rewritePlan == nil || rewritePlan.strategy != "sort" { + return nil, 0, fmt.Errorf("sorted merge requires sort rewrite plan") + } + + firstData, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, entries[0].DataFile().FilePath()) + if err != nil { + return nil, 0, fmt.Errorf("read parquet file %s: %w", entries[0].DataFile().FilePath(), err) + } + firstReader := parquet.NewReader(bytes.NewReader(firstData)) + parquetSchema := firstReader.Schema() + if parquetSchema == nil { + firstReader.Close() + return nil, 0, fmt.Errorf("no parquet schema found in %s", entries[0].DataFile().FilePath()) + } + + sortingColumns, err := rewritePlan.parquetSortingColumns(parquetSchema) + if err != nil { + firstReader.Close() + return nil, 0, fmt.Errorf("resolve parquet sorting columns: %w", err) + } + + resolvedEqGroups, err := resolveEqualityDeleteGroupsForSchema(parquetSchema, eqDeleteGroups, icebergSchema) + if err != nil { + firstReader.Close() + return nil, 0, fmt.Errorf("resolve equality columns: %w", err) + } + + comparator := parquetSchema.Comparator(sortingColumns...) + var allRows []parquet.Row + + collectRows := func(reader *parquet.Reader, source string) (int64, error) { + return visitFilteredParquetRows(ctx, reader, source, bucketName, tablePath, positionDeletes, resolvedEqGroups, func(filtered []parquet.Row) error { + for _, row := range filtered { + allRows = append(allRows, row.Clone()) + } + return nil + }) + } + + totalRows, err := collectRows(firstReader, entries[0].DataFile().FilePath()) + if err != nil { + return nil, 0, err + } + + for _, entry := range entries[1:] { + select { + case <-ctx.Done(): + return nil, 0, ctx.Err() + default: + } + + data, err := loadFileByIcebergPath(ctx, filerClient, bucketName, tablePath, entry.DataFile().FilePath()) + if err != nil { + return nil, 0, fmt.Errorf("read parquet file %s: %w", entry.DataFile().FilePath(), err) + } + + reader := parquet.NewReader(bytes.NewReader(data)) + if !schemasEqual(parquetSchema, reader.Schema()) { + reader.Close() + return nil, 0, fmt.Errorf("schema mismatch in %s: cannot merge files with different schemas", entry.DataFile().FilePath()) + } + + rowsCollected, err := collectRows(reader, entry.DataFile().FilePath()) + if err != nil { + return nil, 0, err + } + totalRows += rowsCollected + } + + sort.SliceStable(allRows, func(i, j int) bool { + return comparator(allRows[i], allRows[j]) < 0 + }) + + var outputBuf bytes.Buffer + writer := parquet.NewWriter(&outputBuf, parquetSchema) + if len(allRows) > 0 { + if _, err := writer.WriteRows(allRows); err != nil { + writer.Close() + return nil, 0, fmt.Errorf("write sorted rows: %w", err) + } + } + if err := writer.Close(); err != nil { + return nil, 0, fmt.Errorf("close writer: %w", err) + } + + return outputBuf.Bytes(), totalRows, nil +} + // compactRandomSuffix returns a short random hex string for use in artifact // filenames to prevent collisions between concurrent runs. func compactRandomSuffix() string { diff --git a/weed/plugin/worker/iceberg/config.go b/weed/plugin/worker/iceberg/config.go index e9c9996d0..22efef049 100644 --- a/weed/plugin/worker/iceberg/config.go +++ b/weed/plugin/worker/iceberg/config.go @@ -22,6 +22,7 @@ const ( defaultDeleteMinInputFiles = 2 defaultDeleteMaxGroupSizeMB = 256 defaultDeleteMaxOutputFiles = 8 + defaultRewriteStrategy = "binpack" defaultMinManifestsToRewrite = 5 minManifestsToRewrite = 2 defaultOperations = "all" @@ -43,6 +44,8 @@ const ( MetricDurationMs = "duration_ms" ) +const bytesPerMB int64 = 1024 * 1024 + // Config holds parsed worker config values. type Config struct { SnapshotRetentionHours int64 @@ -60,7 +63,6 @@ type Config struct { ApplyDeletes bool Where string RewriteStrategy string - SortFields string SortMaxInputBytes int64 } @@ -72,19 +74,18 @@ func ParseConfig(values map[string]*plugin_pb.ConfigValue) Config { MaxSnapshotsToKeep: readInt64Config(values, "max_snapshots_to_keep", defaultMaxSnapshotsToKeep), OrphanOlderThanHours: readInt64Config(values, "orphan_older_than_hours", defaultOrphanOlderThanHours), MaxCommitRetries: readInt64Config(values, "max_commit_retries", defaultMaxCommitRetries), - TargetFileSizeBytes: readInt64Config(values, "target_file_size_mb", defaultTargetFileSizeMB) * 1024 * 1024, + TargetFileSizeBytes: readSizeMBConfig(values, "target_file_size_mb", defaultTargetFileSizeMB), MinInputFiles: readInt64Config(values, "min_input_files", defaultMinInputFiles), - DeleteTargetFileSizeBytes: readInt64Config(values, "delete_target_file_size_mb", defaultDeleteTargetFileSizeMB) * 1024 * 1024, + DeleteTargetFileSizeBytes: readSizeMBConfig(values, "delete_target_file_size_mb", defaultDeleteTargetFileSizeMB), DeleteMinInputFiles: readInt64Config(values, "delete_min_input_files", defaultDeleteMinInputFiles), - DeleteMaxFileGroupSizeBytes: readInt64Config(values, "delete_max_file_group_size_mb", defaultDeleteMaxGroupSizeMB) * 1024 * 1024, + DeleteMaxFileGroupSizeBytes: readSizeMBConfig(values, "delete_max_file_group_size_mb", defaultDeleteMaxGroupSizeMB), DeleteMaxOutputFiles: readInt64Config(values, "delete_max_output_files", defaultDeleteMaxOutputFiles), MinManifestsToRewrite: readInt64Config(values, "min_manifests_to_rewrite", defaultMinManifestsToRewrite), Operations: readStringConfig(values, "operations", defaultOperations), ApplyDeletes: readBoolConfig(values, "apply_deletes", true), Where: strings.TrimSpace(readStringConfig(values, "where", "")), - RewriteStrategy: strings.TrimSpace(strings.ToLower(readStringConfig(values, "rewrite_strategy", "binpack"))), - SortFields: strings.TrimSpace(readStringConfig(values, "sort_fields", "")), - SortMaxInputBytes: readInt64Config(values, "sort_max_input_mb", 0) * 1024 * 1024, + RewriteStrategy: strings.TrimSpace(strings.ToLower(readStringConfig(values, "rewrite_strategy", defaultRewriteStrategy))), + SortMaxInputBytes: readSizeMBConfig(values, "sort_max_input_mb", 0), } // Clamp the fields that are always defaulted by worker config parsing. @@ -98,15 +99,6 @@ func ParseConfig(values map[string]*plugin_pb.ConfigValue) Config { cfg.MaxCommitRetries = defaultMaxCommitRetries } cfg = applyThresholdDefaults(cfg) - if cfg.RewriteStrategy == "" { - cfg.RewriteStrategy = "binpack" - } - if cfg.RewriteStrategy != "binpack" && cfg.RewriteStrategy != "sort" { - cfg.RewriteStrategy = "binpack" - } - if cfg.SortMaxInputBytes < 0 { - cfg.SortMaxInputBytes = 0 - } return cfg } @@ -132,12 +124,34 @@ func applyThresholdDefaults(cfg Config) Config { if cfg.DeleteMaxOutputFiles <= 0 { cfg.DeleteMaxOutputFiles = defaultDeleteMaxOutputFiles } + if cfg.RewriteStrategy == "" { + cfg.RewriteStrategy = defaultRewriteStrategy + } + if cfg.RewriteStrategy != "binpack" && cfg.RewriteStrategy != "sort" { + cfg.RewriteStrategy = defaultRewriteStrategy + } + if cfg.SortMaxInputBytes < 0 { + cfg.SortMaxInputBytes = 0 + } if cfg.MinManifestsToRewrite < minManifestsToRewrite { cfg.MinManifestsToRewrite = minManifestsToRewrite } return cfg } +func readSizeMBConfig(values map[string]*plugin_pb.ConfigValue, field string, fallbackMB int64) int64 { + mb := readInt64Config(values, field, fallbackMB) + if mb <= 0 { + return 0 + } + maxMB := int64(^uint64(0)>>1) / bytesPerMB + if mb > maxMB { + glog.V(1).Infof("readSizeMBConfig: clamping %q from %d MB to %d MB", field, mb, maxMB) + mb = maxMB + } + return mb * bytesPerMB +} + // parseOperations returns the ordered list of maintenance operations to execute. // Order follows Iceberg best practices: compact → rewrite_position_delete_files // → expire_snapshots → remove_orphans → rewrite_manifests. diff --git a/weed/plugin/worker/iceberg/detection.go b/weed/plugin/worker/iceberg/detection.go index 5892cba3c..80e7fcd61 100644 --- a/weed/plugin/worker/iceberg/detection.go +++ b/weed/plugin/worker/iceberg/detection.go @@ -438,7 +438,14 @@ func hasEligibleCompaction( } } - bins := buildCompactionBins(candidateEntries, config.TargetFileSizeBytes, minInputFiles) + rewritePlan, err := resolveCompactionRewritePlan(config, meta) + if err != nil { + return false, fmt.Errorf("resolve rewrite strategy: %w", err) + } + + targetSize := compactionTargetSizeForPlan(config, rewritePlan) + bins := buildCompactionBins(candidateEntries, targetSize, minInputFiles) + bins = filterCompactionBinsByPlan(bins, config, rewritePlan) return len(bins) > 0, nil } diff --git a/weed/plugin/worker/iceberg/exec_test.go b/weed/plugin/worker/iceberg/exec_test.go index 946bc51df..38a83c4a6 100644 --- a/weed/plugin/worker/iceberg/exec_test.go +++ b/weed/plugin/worker/iceberg/exec_test.go @@ -2025,13 +2025,44 @@ func populateTableWithDeleteFiles( Name string } }, +) table.Metadata { + return populateTableWithDeleteFilesAndSortOrder(t, fs, setup, dataFiles, posDeleteFiles, eqDeleteFiles, table.UnsortedSortOrder) +} + +func populateTableWithDeleteFilesAndSortOrder( + t *testing.T, + fs *fakeFilerServer, + setup tableSetup, + dataFiles []struct { + Name string + Rows []struct { + ID int64 + Name string + } + }, + posDeleteFiles []struct { + Name string + Rows []struct { + FilePath string + Pos int64 + } + }, + eqDeleteFiles []struct { + Name string + FieldIDs []int + Rows []struct { + ID int64 + Name string + } + }, + sortOrder table.SortOrder, ) table.Metadata { t.Helper() schema := newTestSchema() spec := *iceberg.UnpartitionedSpec - meta, err := table.NewMetadata(schema, &spec, table.UnsortedSortOrder, "s3://"+setup.BucketName+"/"+setup.tablePath(), nil) + meta, err := table.NewMetadata(schema, &spec, sortOrder, "s3://"+setup.BucketName+"/"+setup.tablePath(), nil) if err != nil { t.Fatalf("create metadata: %v", err) } @@ -2188,7 +2219,20 @@ func populateTableWithDeleteFiles( // Build final metadata with snapshot now := time.Now().UnixMilli() snap := table.Snapshot{SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro"} - meta = buildTestMetadata(t, []table.Snapshot{snap}) + builder, err := table.MetadataBuilderFromBase(meta, "s3://"+setup.BucketName+"/"+setup.tablePath()) + if err != nil { + t.Fatalf("create metadata builder: %v", err) + } + if err := builder.AddSnapshot(&snap); err != nil { + t.Fatalf("add snapshot: %v", err) + } + if err := builder.SetSnapshotRef(table.MainBranch, snap.SnapshotID, table.BranchRef); err != nil { + t.Fatalf("set snapshot ref: %v", err) + } + meta, err = builder.Build() + if err != nil { + t.Fatalf("build metadata: %v", err) + } // Register table structure fullMetadataJSON, _ := json.Marshal(meta) @@ -2325,6 +2369,52 @@ func rewriteDeleteManifestsAsMixed( }) } +func readCompactedRows(t *testing.T, fs *fakeFilerServer, setup tableSetup) []struct { + ID int64 + Name string +} { + t.Helper() + + dataDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath(), "data") + entries := fs.listDir(dataDir) + var mergedContent []byte + for _, e := range entries { + if strings.HasPrefix(e.Name, "compact-") { + mergedContent = e.Content + break + } + } + if mergedContent == nil { + t.Fatal("no merged file found") + } + + reader := parquet.NewReader(bytes.NewReader(mergedContent)) + defer reader.Close() + + type row struct { + ID int64 `parquet:"id"` + Name string `parquet:"name"` + } + var outputRows []struct { + ID int64 + Name string + } + for { + var r row + if err := reader.Read(&r); err != nil { + if err == io.EOF { + break + } + t.Fatalf("read: %v", err) + } + outputRows = append(outputRows, struct { + ID int64 + Name string + }{ID: r.ID, Name: r.Name}) + } + return outputRows +} + func TestCompactDataFilesMetrics(t *testing.T) { fs, client := startFakeFiler(t) @@ -2832,6 +2922,335 @@ func TestCompactDataFilesWithMixedDeletes(t *testing.T) { } } +func TestCompactDataFilesSortStrategyUsesAscendingTableSortOrder(t *testing.T) { + fs, client := startFakeFiler(t) + + sortOrder, err := table.NewSortOrder(1, []table.SortField{{ + SourceID: 1, + Transform: iceberg.IdentityTransform{}, + Direction: table.SortASC, + NullOrder: table.NullsFirst, + }}) + if err != nil { + t.Fatalf("new sort order: %v", err) + } + + setup := tableSetup{BucketName: "tb", Namespace: "ns", TableName: "tbl"} + populateTableWithDeleteFilesAndSortOrder(t, fs, setup, + []struct { + Name string + Rows []struct { + ID int64 + Name string + } + }{ + {"d1.parquet", []struct { + ID int64 + Name string + }{{3, "charlie"}, {1, "alice"}}}, + {"d2.parquet", []struct { + ID int64 + Name string + }{{4, "delta"}, {2, "bravo"}}}, + }, + nil, + nil, + sortOrder, + ) + + handler := NewHandler(nil) + config := Config{ + TargetFileSizeBytes: 256 * 1024 * 1024, + MinInputFiles: 2, + MaxCommitRetries: 3, + ApplyDeletes: true, + RewriteStrategy: "sort", + } + + result, _, err := handler.compactDataFiles(context.Background(), client, setup.BucketName, setup.tablePath(), config, nil) + if err != nil { + t.Fatalf("compactDataFiles: %v", err) + } + if !strings.Contains(result, "using sort") { + t.Fatalf("expected sorted compaction result, got %q", result) + } + + rows := readCompactedRows(t, fs, setup) + if len(rows) != 4 { + t.Fatalf("expected 4 compacted rows, got %d", len(rows)) + } + for i := 1; i < len(rows); i++ { + if rows[i-1].ID > rows[i].ID { + t.Fatalf("rows are not sorted by id: %+v", rows) + } + } +} + +func TestCompactDataFilesSortStrategyUsesTableSortOrder(t *testing.T) { + fs, client := startFakeFiler(t) + + sortOrder, err := table.NewSortOrder(1, []table.SortField{{ + SourceID: 2, + Transform: iceberg.IdentityTransform{}, + Direction: table.SortDESC, + NullOrder: table.NullsLast, + }}) + if err != nil { + t.Fatalf("new sort order: %v", err) + } + + setup := tableSetup{BucketName: "tb", Namespace: "ns", TableName: "tbl"} + populateTableWithDeleteFilesAndSortOrder(t, fs, setup, + []struct { + Name string + Rows []struct { + ID int64 + Name string + } + }{ + {"d1.parquet", []struct { + ID int64 + Name string + }{{1, "alice"}, {2, "charlie"}}}, + {"d2.parquet", []struct { + ID int64 + Name string + }{{3, "bravo"}, {4, "delta"}}}, + }, + nil, + nil, + sortOrder, + ) + + handler := NewHandler(nil) + config := Config{ + TargetFileSizeBytes: 256 * 1024 * 1024, + MinInputFiles: 2, + MaxCommitRetries: 3, + ApplyDeletes: true, + RewriteStrategy: "sort", + } + + result, _, err := handler.compactDataFiles(context.Background(), client, setup.BucketName, setup.tablePath(), config, nil) + if err != nil { + t.Fatalf("compactDataFiles: %v", err) + } + if !strings.Contains(result, "name desc") { + t.Fatalf("expected table sort order in result, got %q", result) + } + + rows := readCompactedRows(t, fs, setup) + gotNames := make([]string, 0, len(rows)) + for _, row := range rows { + gotNames = append(gotNames, row.Name) + } + expectedNames := []string{"delta", "charlie", "bravo", "alice"} + if len(gotNames) != len(expectedNames) { + t.Fatalf("got %d rows, want %d", len(gotNames), len(expectedNames)) + } + for i := range expectedNames { + if gotNames[i] != expectedNames[i] { + t.Fatalf("names[%d] = %q, want %q (all names: %v)", i, gotNames[i], expectedNames[i], gotNames) + } + } +} + +func TestDetectSkipsSortCompactionBinsAboveCap(t *testing.T) { + fs, client := startFakeFiler(t) + + sortOrder, err := table.NewSortOrder(1, []table.SortField{{ + SourceID: 1, + Transform: iceberg.IdentityTransform{}, + Direction: table.SortASC, + NullOrder: table.NullsFirst, + }}) + if err != nil { + t.Fatalf("new sort order: %v", err) + } + + setup := tableSetup{BucketName: "test-bucket", Namespace: "analytics", TableName: "events"} + populateTableWithDeleteFilesAndSortOrder(t, fs, setup, + []struct { + Name string + Rows []struct { + ID int64 + Name string + } + }{ + {"small-1.parquet", []struct { + ID int64 + Name string + }{{1, "alpha"}}}, + {"small-2.parquet", []struct { + ID int64 + Name string + }{{2, "bravo"}}}, + }, + nil, + nil, + sortOrder, + ) + + handler := NewHandler(nil) + config := Config{ + TargetFileSizeBytes: 4096, + MinInputFiles: 2, + Operations: "compact", + RewriteStrategy: "sort", + SortMaxInputBytes: 1, + } + + tables, err := handler.scanTablesForMaintenance(context.Background(), client, config, "", "", "", 0) + if err != nil { + t.Fatalf("scanTablesForMaintenance failed: %v", err) + } + if len(tables) != 0 { + t.Fatalf("expected no sort compaction candidates above the input cap, got %d", len(tables)) + } +} + +func TestDetectSplitsSortCompactionBinsByCap(t *testing.T) { + fs, client := startFakeFiler(t) + + sortOrder, err := table.NewSortOrder(1, []table.SortField{{ + SourceID: 1, + Transform: iceberg.IdentityTransform{}, + Direction: table.SortASC, + NullOrder: table.NullsFirst, + }}) + if err != nil { + t.Fatalf("new sort order: %v", err) + } + + setup := tableSetup{BucketName: "test-bucket", Namespace: "analytics", TableName: "events"} + populateTableWithDeleteFilesAndSortOrder(t, fs, setup, + []struct { + Name string + Rows []struct { + ID int64 + Name string + } + }{ + {"small-1.parquet", []struct { + ID int64 + Name string + }{{1, "alpha"}}}, + {"small-2.parquet", []struct { + ID int64 + Name string + }{{2, "bravo"}}}, + {"small-3.parquet", []struct { + ID int64 + Name string + }{{3, "charlie"}}}, + {"small-4.parquet", []struct { + ID int64 + Name string + }{{4, "delta"}}}, + }, + nil, + nil, + sortOrder, + ) + + meta, _, err := loadCurrentMetadata(context.Background(), client, setup.BucketName, setup.tablePath()) + if err != nil { + t.Fatalf("loadCurrentMetadata: %v", err) + } + manifests, err := loadCurrentManifests(context.Background(), client, setup.BucketName, setup.tablePath(), meta) + if err != nil { + t.Fatalf("loadCurrentManifests: %v", err) + } + + var totalSize int64 + var maxFileSize int64 + for _, mf := range manifests { + if mf.ManifestContent() != iceberg.ManifestContentData { + continue + } + manifestData, err := loadFileByIcebergPath(context.Background(), client, setup.BucketName, setup.tablePath(), mf.FilePath()) + if err != nil { + t.Fatalf("load data manifest: %v", err) + } + entries, err := iceberg.ReadManifest(mf, bytes.NewReader(manifestData), true) + if err != nil { + t.Fatalf("read data manifest: %v", err) + } + for _, entry := range entries { + size := entry.DataFile().FileSizeBytes() + totalSize += size + if size > maxFileSize { + maxFileSize = size + } + } + } + if totalSize == 0 || maxFileSize == 0 { + t.Fatal("expected data file sizes to be populated") + } + + capBytes := totalSize / 2 + if capBytes <= maxFileSize { + capBytes = maxFileSize + 1 + } + + handler := NewHandler(nil) + config := Config{ + TargetFileSizeBytes: totalSize + 1, + MinInputFiles: 2, + Operations: "compact", + RewriteStrategy: "sort", + SortMaxInputBytes: capBytes, + } + + tables, err := handler.scanTablesForMaintenance(context.Background(), client, config, "", "", "", 0) + if err != nil { + t.Fatalf("scanTablesForMaintenance failed: %v", err) + } + if len(tables) != 1 { + t.Fatalf("expected sort compaction candidate to survive cap-based repartitioning, got %d", len(tables)) + } +} + +func TestCompactDataFilesSortStrategyRequiresTableSortOrder(t *testing.T) { + fs, client := startFakeFiler(t) + + setup := tableSetup{BucketName: "tb", Namespace: "ns", TableName: "tbl"} + populateTableWithDeleteFiles(t, fs, setup, + []struct { + Name string + Rows []struct { + ID int64 + Name string + } + }{ + {"d1.parquet", []struct { + ID int64 + Name string + }{{3, "charlie"}, {1, "alice"}}}, + {"d2.parquet", []struct { + ID int64 + Name string + }{{4, "delta"}, {2, "bravo"}}}, + }, + nil, + nil, + ) + + handler := NewHandler(nil) + config := Config{ + TargetFileSizeBytes: 256 * 1024 * 1024, + MinInputFiles: 2, + MaxCommitRetries: 3, + ApplyDeletes: true, + RewriteStrategy: "sort", + } + + _, _, err := handler.compactDataFiles(context.Background(), client, setup.BucketName, setup.tablePath(), config, nil) + if err == nil || !strings.Contains(err.Error(), "table sort order") { + t.Fatalf("expected missing table sort order error, got %v", err) + } +} + func TestRewritePositionDeleteFilesExecution(t *testing.T) { fs, client := startFakeFiler(t) @@ -3197,3 +3616,28 @@ func TestRewritePositionDeleteFilesRebuildsMixedDeleteManifests(t *testing.T) { t.Fatalf("expected equality delete file to be preserved, got %v", eqPaths) } } + +func TestResolveCompactionRewritePlanFallsBackForUnsupportedSortTransform(t *testing.T) { + sortOrder, err := table.NewSortOrder(1, []table.SortField{{ + SourceID: 1, + Transform: iceberg.BucketTransform{NumBuckets: 16}, + Direction: table.SortASC, + NullOrder: table.NullsFirst, + }}) + if err != nil { + t.Fatalf("new sort order: %v", err) + } + + meta, err := table.NewMetadata(newTestSchema(), iceberg.UnpartitionedSpec, sortOrder, "s3://test-bucket/test-table", nil) + if err != nil { + t.Fatalf("new metadata: %v", err) + } + + plan, err := resolveCompactionRewritePlan(Config{RewriteStrategy: "sort"}, meta) + if err != nil { + t.Fatalf("resolveCompactionRewritePlan: %v", err) + } + if plan == nil || plan.strategy != defaultRewriteStrategy { + t.Fatalf("expected fallback to %q for unsupported transform, got %+v", defaultRewriteStrategy, plan) + } +} diff --git a/weed/plugin/worker/iceberg/handler.go b/weed/plugin/worker/iceberg/handler.go index 09d6158b6..f0be05d89 100644 --- a/weed/plugin/worker/iceberg/handler.go +++ b/weed/plugin/worker/iceberg/handler.go @@ -187,19 +187,11 @@ func (h *Handler) Descriptor() *plugin_pb.JobTypeDescriptor { { Name: "rewrite_strategy", Label: "Rewrite Strategy", - Description: "binpack keeps the current row order; sort rewrites each compaction bin using sort_fields or the table sort order.", + Description: "binpack keeps the existing row order; sort rewrites each compaction bin using the Iceberg table sort order.", FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING, Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT, Placeholder: "binpack or sort", }, - { - Name: "sort_fields", - Label: "Sort Fields", - Description: "Comma-separated field names for rewrite_strategy=sort. Blank uses the table sort order when present.", - FieldType: plugin_pb.ConfigFieldType_CONFIG_FIELD_TYPE_STRING, - Widget: plugin_pb.ConfigWidget_CONFIG_WIDGET_TEXT, - Placeholder: "id, created_at", - }, { Name: "sort_max_input_mb", Label: "Sort Max Input (MB)", @@ -325,8 +317,7 @@ func (h *Handler) Descriptor() *plugin_pb.JobTypeDescriptor { "max_commit_retries": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMaxCommitRetries}}, "operations": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: defaultOperations}}, "apply_deletes": {Kind: &plugin_pb.ConfigValue_BoolValue{BoolValue: true}}, - "rewrite_strategy": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "binpack"}}, - "sort_fields": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}}, + "rewrite_strategy": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: defaultRewriteStrategy}}, "sort_max_input_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}}, "where": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}}, }, @@ -355,8 +346,7 @@ func (h *Handler) Descriptor() *plugin_pb.JobTypeDescriptor { "max_commit_retries": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: defaultMaxCommitRetries}}, "operations": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: defaultOperations}}, "apply_deletes": {Kind: &plugin_pb.ConfigValue_BoolValue{BoolValue: true}}, - "rewrite_strategy": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "binpack"}}, - "sort_fields": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}}, + "rewrite_strategy": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: defaultRewriteStrategy}}, "sort_max_input_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 0}}, "where": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: ""}}, }, diff --git a/weed/plugin/worker/iceberg/handler_test.go b/weed/plugin/worker/iceberg/handler_test.go index 3d989880a..f6b889aef 100644 --- a/weed/plugin/worker/iceberg/handler_test.go +++ b/weed/plugin/worker/iceberg/handler_test.go @@ -34,6 +34,12 @@ func TestParseConfig(t *testing.T) { if config.Operations != defaultOperations { t.Errorf("expected Operations=%q, got %q", defaultOperations, config.Operations) } + if config.RewriteStrategy != defaultRewriteStrategy { + t.Errorf("expected RewriteStrategy=%q, got %q", defaultRewriteStrategy, config.RewriteStrategy) + } + if config.SortMaxInputBytes != 0 { + t.Errorf("expected SortMaxInputBytes=0, got %d", config.SortMaxInputBytes) + } } func TestParseOperations(t *testing.T) { @@ -879,6 +885,38 @@ func TestNormalizeDetectionConfigUsesSharedDefaults(t *testing.T) { } } +func TestParseConfigRewriteStrategy(t *testing.T) { + config := ParseConfig(map[string]*plugin_pb.ConfigValue{ + "rewrite_strategy": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "sort"}}, + "sort_max_input_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: 64}}, + }) + if config.RewriteStrategy != "sort" { + t.Fatalf("expected sort rewrite strategy, got %q", config.RewriteStrategy) + } + if config.SortMaxInputBytes != 64*1024*1024 { + t.Fatalf("expected SortMaxInputBytes=64MB, got %d", config.SortMaxInputBytes) + } + + config = ParseConfig(map[string]*plugin_pb.ConfigValue{ + "rewrite_strategy": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: "invalid"}}, + "sort_max_input_mb": {Kind: &plugin_pb.ConfigValue_Int64Value{Int64Value: -1}}, + }) + if config.RewriteStrategy != defaultRewriteStrategy { + t.Fatalf("expected invalid rewrite strategy to fall back to %q, got %q", defaultRewriteStrategy, config.RewriteStrategy) + } + if config.SortMaxInputBytes != 0 { + t.Fatalf("expected negative sort cap to clamp to 0, got %d", config.SortMaxInputBytes) + } + + maxMB := int64(^uint64(0)>>1) / bytesPerMB + config = ParseConfig(map[string]*plugin_pb.ConfigValue{ + "sort_max_input_mb": {Kind: &plugin_pb.ConfigValue_StringValue{StringValue: fmt.Sprintf("%d", maxMB+1)}}, + }) + if config.SortMaxInputBytes != maxMB*bytesPerMB { + t.Fatalf("expected oversized sort cap to clamp to %d bytes, got %d", maxMB*bytesPerMB, config.SortMaxInputBytes) + } +} + func TestCollectPositionDeletes(t *testing.T) { fs, client := startFakeFiler(t) diff --git a/weed/plugin/worker/iceberg/planning_index.go b/weed/plugin/worker/iceberg/planning_index.go index 0015012ab..74e019354 100644 --- a/weed/plugin/worker/iceberg/planning_index.go +++ b/weed/plugin/worker/iceberg/planning_index.go @@ -105,7 +105,9 @@ func (idx *planningIndex) rewriteManifestsEligible(config Config) (bool, bool) { } func compactionPlanningConfigHash(config Config) string { - return fmt.Sprintf("target=%d|min=%d", config.TargetFileSizeBytes, config.MinInputFiles) + return fmt.Sprintf("target=%d|min=%d|strategy=%s|sortcap=%d", + config.TargetFileSizeBytes, config.MinInputFiles, + config.RewriteStrategy, config.SortMaxInputBytes) } func operationRequested(ops []string, wanted string) bool { diff --git a/weed/plugin/worker/iceberg/sort_strategy.go b/weed/plugin/worker/iceberg/sort_strategy.go new file mode 100644 index 000000000..d89e1ee60 --- /dev/null +++ b/weed/plugin/worker/iceberg/sort_strategy.go @@ -0,0 +1,181 @@ +package iceberg + +import ( + "fmt" + "strings" + + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/table" + "github.com/parquet-go/parquet-go" + "github.com/seaweedfs/seaweedfs/weed/glog" +) + +type compactionRewritePlan struct { + strategy string + sortFields []compactionSortField +} + +type compactionSortField struct { + path string + descending bool + nullsFirst bool +} + +func resolveCompactionRewritePlan(config Config, meta table.Metadata) (*compactionRewritePlan, error) { + strategy := strings.TrimSpace(strings.ToLower(config.RewriteStrategy)) + if strategy == "" { + strategy = defaultRewriteStrategy + } + if strategy == defaultRewriteStrategy { + return &compactionRewritePlan{strategy: defaultRewriteStrategy}, nil + } + if strategy != "sort" { + return nil, fmt.Errorf("unsupported rewrite strategy %q", config.RewriteStrategy) + } + + sortFields, err := resolveCompactionSortFields(meta) + if err != nil { + if err == errUnsupportedTableSortOrder { + glog.V(2).Infof("iceberg compact: falling back to binpack because the table sort order is not supported") + return &compactionRewritePlan{strategy: defaultRewriteStrategy}, nil + } + return nil, err + } + + return &compactionRewritePlan{ + strategy: strategy, + sortFields: sortFields, + }, nil +} + +var errUnsupportedTableSortOrder = fmt.Errorf("unsupported table sort order") + +func resolveCompactionSortFields(meta table.Metadata) ([]compactionSortField, error) { + if meta == nil || meta.CurrentSchema() == nil { + return nil, fmt.Errorf("sort rewrite requires table schema metadata") + } + + sortOrder := meta.SortOrder() + if sortOrder.IsUnsorted() { + return nil, fmt.Errorf("sort rewrite requires a table sort order") + } + + schema := meta.CurrentSchema() + seen := make(map[string]struct{}) + fields := make([]compactionSortField, 0, sortOrder.Len()) + for sortField := range sortOrder.Fields() { + if _, ok := sortField.Transform.(iceberg.IdentityTransform); !ok { + return nil, errUnsupportedTableSortOrder + } + + field, ok := schema.FindFieldByID(sortField.SourceID) + if !ok { + return nil, fmt.Errorf("table sort field %d not found in schema", sortField.SourceID) + } + if _, ok := field.Type.(iceberg.PrimitiveType); !ok { + return nil, fmt.Errorf("table sort field %q is not a primitive column", field.Name) + } + + columnPath, ok := schema.FindColumnName(sortField.SourceID) + if !ok { + columnPath = field.Name + } + normalizedPath := strings.ToLower(columnPath) + if _, exists := seen[normalizedPath]; exists { + return nil, fmt.Errorf("duplicate sort field %q", columnPath) + } + seen[normalizedPath] = struct{}{} + + fields = append(fields, compactionSortField{ + path: columnPath, + descending: sortField.Direction == table.SortDESC, + nullsFirst: sortField.NullOrder == table.NullsFirst, + }) + } + + if len(fields) == 0 { + return nil, fmt.Errorf("sort rewrite requires at least one sort field") + } + return fields, nil +} + +func compactionTargetSizeForPlan(config Config, plan *compactionRewritePlan) int64 { + targetSize := config.TargetFileSizeBytes + if plan != nil && plan.strategy == "sort" && config.SortMaxInputBytes > 0 && config.SortMaxInputBytes < targetSize { + return config.SortMaxInputBytes + } + return targetSize +} + +func filterCompactionBinsByPlan(bins []compactionBin, config Config, plan *compactionRewritePlan) []compactionBin { + if plan == nil || plan.strategy != "sort" || config.SortMaxInputBytes <= 0 { + return bins + } + + filtered := make([]compactionBin, 0, len(bins)) + for _, bin := range bins { + if bin.TotalSize <= config.SortMaxInputBytes { + filtered = append(filtered, bin) + } + } + return filtered +} + +func compactionNoEligibleMessage(config Config, plan *compactionRewritePlan, initialBinCount int) string { + if plan != nil && plan.strategy == "sort" && config.SortMaxInputBytes > 0 && initialBinCount > 0 { + return fmt.Sprintf("no files eligible for sorted compaction within %d MB sort input cap", config.SortMaxInputBytes/bytesPerMB) + } + return "no files eligible for compaction" +} + +func (p *compactionRewritePlan) summaryLabel() string { + if p == nil || p.strategy != "sort" { + return defaultRewriteStrategy + } + + paths := make([]string, 0, len(p.sortFields)) + for _, field := range p.sortFields { + label := field.path + if field.descending { + label += " desc" + } else { + label += " asc" + } + if field.nullsFirst { + label += " nulls-first" + } else { + label += " nulls-last" + } + paths = append(paths, label) + } + return fmt.Sprintf("sort (%s)", strings.Join(paths, ", ")) +} + +func (p *compactionRewritePlan) parquetSortingColumns(pqSchema *parquet.Schema) ([]parquet.SortingColumn, error) { + if p == nil || p.strategy != "sort" { + return nil, nil + } + if pqSchema == nil { + return nil, fmt.Errorf("parquet schema is required for sort rewrite") + } + + columns := make([]parquet.SortingColumn, 0, len(p.sortFields)) + for _, field := range p.sortFields { + path := strings.Split(field.path, ".") + if _, ok := pqSchema.Lookup(path...); !ok { + return nil, fmt.Errorf("parquet schema missing sort field %q", field.path) + } + + var column parquet.SortingColumn + if field.descending { + column = parquet.Descending(path...) + } else { + column = parquet.Ascending(path...) + } + if field.nullsFirst { + column = parquet.NullsFirst(column) + } + columns = append(columns, column) + } + return columns, nil +}