diff --git a/weed/plugin/worker/iceberg/detection.go b/weed/plugin/worker/iceberg/detection.go index 49b6552c2..0ffff0c61 100644 --- a/weed/plugin/worker/iceberg/detection.go +++ b/weed/plugin/worker/iceberg/detection.go @@ -3,7 +3,6 @@ package iceberg import ( "bytes" "context" - "encoding/json" "fmt" "path" "strings" @@ -121,30 +120,14 @@ func (h *Handler) scanTablesForMaintenance( continue } - // Parse the internal metadata to get FullMetadata - var internalMeta struct { - MetadataLocation string `json:"metadataLocation,omitempty"` - Metadata *struct { - FullMetadata json.RawMessage `json:"fullMetadata,omitempty"` - } `json:"metadata,omitempty"` - } - if err := json.Unmarshal(metadataBytes, &internalMeta); err != nil { - glog.V(2).Infof("iceberg maintenance: skipping %s/%s/%s: cannot parse metadata: %v", bucketName, nsName, tblName, err) - continue - } - if internalMeta.Metadata == nil || len(internalMeta.Metadata.FullMetadata) == 0 { - continue - } - - icebergMeta, err := table.ParseMetadataBytes(internalMeta.Metadata.FullMetadata) + icebergMeta, metadataFileName, planningIndex, err := parseTableMetadataEnvelope(metadataBytes) if err != nil { glog.V(2).Infof("iceberg maintenance: skipping %s/%s/%s: cannot parse iceberg metadata: %v", bucketName, nsName, tblName, err) continue } tablePath := path.Join(nsName, tblName) - metadataFileName := metadataFileNameFromLocation(internalMeta.MetadataLocation, bucketName, tablePath) - needsWork, err := h.tableNeedsMaintenance(ctx, filerClient, bucketName, tablePath, icebergMeta, metadataFileName, config, ops) + needsWork, err := h.tableNeedsMaintenance(ctx, filerClient, bucketName, tablePath, icebergMeta, metadataFileName, planningIndex, config, ops) if err != nil { glog.V(2).Infof("iceberg maintenance: skipping %s/%s/%s: cannot evaluate maintenance need: %v", bucketName, nsName, tblName, err) continue @@ -192,6 +175,7 @@ func (h *Handler) tableNeedsMaintenance( bucketName, tablePath string, meta table.Metadata, metadataFileName string, + cachedPlanningIndex *planningIndex, config Config, ops []string, ) (bool, error) { @@ -205,22 +189,66 @@ func (h *Handler) tableNeedsMaintenance( } } - loadManifests := func() ([]iceberg.ManifestFile, error) { - return loadCurrentManifests(ctx, filerClient, bucketName, tablePath, meta) - } - var currentManifests []iceberg.ManifestFile var manifestsErr error - manifestsLoaded := false + var manifestsLoaded bool getCurrentManifests := func() ([]iceberg.ManifestFile, error) { if manifestsLoaded { return currentManifests, manifestsErr } - currentManifests, manifestsErr = loadManifests() + currentManifests, manifestsErr = loadCurrentManifests(ctx, filerClient, bucketName, tablePath, meta) manifestsLoaded = true return currentManifests, manifestsErr } + + computedPlanningIndexes := make(map[string]*planningIndex) + planningIndexLoaded := make(map[string]bool) + planningIndexErrs := make(map[string]error) + getPlanningIndex := func(op string) (*planningIndex, error) { + if planningIndexLoaded[op] { + return computedPlanningIndexes[op], planningIndexErrs[op] + } + planningIndexLoaded[op] = true + + manifests, err := getCurrentManifests() + if err != nil { + planningIndexErrs[op] = err + return nil, err + } + index, err := buildPlanningIndexFromManifests(ctx, filerClient, bucketName, tablePath, meta, config, []string{op}, manifests) + if err != nil { + planningIndexErrs[op] = err + return nil, err + } + computedPlanningIndexes[op] = index + if index != nil { + if err := persistPlanningIndex(ctx, filerClient, bucketName, tablePath, index); err != nil { + glog.V(2).Infof("iceberg maintenance: unable to persist planning index for %s/%s: %v", bucketName, tablePath, err) + } + } + return index, nil + } + checkPlanningIndex := func(op string, eligibleFn func(*planningIndex, Config) (bool, bool)) (bool, error) { + if cachedPlanningIndex != nil && cachedPlanningIndex.matchesSnapshot(meta) { + if eligible, ok := eligibleFn(cachedPlanningIndex, config); ok { + return eligible, nil + } + } + + index, err := getPlanningIndex(op) + if err != nil { + return false, err + } + if index == nil { + return false, nil + } + + eligible, _ := eligibleFn(index, config) + return eligible, nil + } + var opEvalErrors []string + planningIndexErrorReported := false for _, op := range ops { switch op { @@ -228,26 +256,27 @@ func (h *Handler) tableNeedsMaintenance( // Handled by the metadata-only check above. continue case "compact": - manifests, err := getCurrentManifests() + eligible, err := checkPlanningIndex(op, (*planningIndex).compactionEligible) if err != nil { - opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err)) - continue - } - eligible, err := hasEligibleCompaction(ctx, filerClient, bucketName, tablePath, manifests, config) - if err != nil { - opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err)) + if !planningIndexErrorReported { + opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err)) + planningIndexErrorReported = true + } continue } if eligible { return true, nil } case "rewrite_manifests": - manifests, err := getCurrentManifests() + eligible, err := checkPlanningIndex(op, (*planningIndex).rewriteManifestsEligible) if err != nil { - opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err)) + if !planningIndexErrorReported { + opEvalErrors = append(opEvalErrors, fmt.Sprintf("%s: %v", op, err)) + planningIndexErrorReported = true + } continue } - if countDataManifests(manifests) >= config.MinManifestsToRewrite { + if eligible { return true, nil } case "remove_orphans": diff --git a/weed/plugin/worker/iceberg/exec_test.go b/weed/plugin/worker/iceberg/exec_test.go index cbb4c088a..376e9e195 100644 --- a/weed/plugin/worker/iceberg/exec_test.go +++ b/weed/plugin/worker/iceberg/exec_test.go @@ -1255,6 +1255,349 @@ func TestDetectSchedulesManifestRewriteWithoutSnapshotPressure(t *testing.T) { } } +func TestDetectUsesPlanningIndexForRepeatedCompactionScans(t *testing.T) { + fs, client := startFakeFiler(t) + + now := time.Now().UnixMilli() + setup := tableSetup{ + BucketName: "test-bucket", + Namespace: "analytics", + TableName: "events", + Snapshots: []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro", SequenceNumber: 1}, + }, + } + meta := populateTable(t, fs, setup) + writeCurrentSnapshotManifests(t, fs, setup, meta, [][]iceberg.ManifestEntry{ + makeManifestEntries(t, []testEntrySpec{ + {path: "data/small-1.parquet", size: 1024, partition: map[int]any{}}, + {path: "data/small-2.parquet", size: 1024, partition: map[int]any{}}, + {path: "data/small-3.parquet", size: 1024, partition: map[int]any{}}, + }, 1), + }) + + handler := NewHandler(nil) + config := Config{ + TargetFileSizeBytes: 4096, + MinInputFiles: 2, + Operations: "compact", + } + + tables, err := handler.scanTablesForMaintenance(context.Background(), client, config, "", "", "", 0) + if err != nil { + t.Fatalf("scanTablesForMaintenance failed: %v", err) + } + if len(tables) != 1 { + t.Fatalf("expected 1 compaction candidate, got %d", len(tables)) + } + + tableDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.Namespace) + tableEntry := fs.getEntry(tableDir, setup.TableName) + if tableEntry == nil { + t.Fatal("table entry not found") + } + + var envelope struct { + PlanningIndex *planningIndex `json:"planningIndex,omitempty"` + } + if err := json.Unmarshal(tableEntry.Extended[s3tables.ExtendedKeyMetadata], &envelope); err != nil { + t.Fatalf("parse table metadata xattr: %v", err) + } + if envelope.PlanningIndex == nil || envelope.PlanningIndex.Compaction == nil { + t.Fatal("expected persisted compaction planning index after first scan") + } + + metaDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath(), "metadata") + fs.putEntry(metaDir, "snap-1.avro", &filer_pb.Entry{ + Name: "snap-1.avro", + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + FileSize: uint64(len("broken")), + }, + Content: []byte("broken"), + }) + + tables, err = handler.scanTablesForMaintenance(context.Background(), client, config, "", "", "", 0) + if err != nil { + t.Fatalf("scanTablesForMaintenance with cached planning index failed: %v", err) + } + if len(tables) != 1 { + t.Fatalf("expected cached planning index to preserve 1 compaction candidate, got %d", len(tables)) + } +} + +func TestDetectInvalidatesPlanningIndexWhenCompactionConfigChanges(t *testing.T) { + fs, client := startFakeFiler(t) + + now := time.Now().UnixMilli() + setup := tableSetup{ + BucketName: "test-bucket", + Namespace: "analytics", + TableName: "events", + Snapshots: []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro", SequenceNumber: 1}, + }, + } + meta := populateTable(t, fs, setup) + writeCurrentSnapshotManifests(t, fs, setup, meta, [][]iceberg.ManifestEntry{ + makeManifestEntries(t, []testEntrySpec{ + {path: "data/small-1.parquet", size: 1024, partition: map[int]any{}}, + {path: "data/small-2.parquet", size: 1024, partition: map[int]any{}}, + }, 1), + }) + + handler := NewHandler(nil) + initialConfig := Config{ + TargetFileSizeBytes: 4096, + MinInputFiles: 3, + Operations: "compact", + } + + tables, err := handler.scanTablesForMaintenance(context.Background(), client, initialConfig, "", "", "", 0) + if err != nil { + t.Fatalf("initial scanTablesForMaintenance failed: %v", err) + } + if len(tables) != 0 { + t.Fatalf("expected no compaction candidates with min_input_files=3, got %d", len(tables)) + } + + updatedConfig := Config{ + TargetFileSizeBytes: 4096, + MinInputFiles: 2, + Operations: "compact", + } + + tables, err = handler.scanTablesForMaintenance(context.Background(), client, updatedConfig, "", "", "", 0) + if err != nil { + t.Fatalf("updated scanTablesForMaintenance failed: %v", err) + } + if len(tables) != 1 { + t.Fatalf("expected planning index invalidation to yield 1 compaction candidate, got %d", len(tables)) + } +} + +func TestDetectPlanningIndexPreservesUnscannedSections(t *testing.T) { + fs, client := startFakeFiler(t) + + now := time.Now().UnixMilli() + setup := tableSetup{ + BucketName: "test-bucket", + Namespace: "analytics", + TableName: "events", + Snapshots: []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro", SequenceNumber: 1}, + }, + } + meta := populateTable(t, fs, setup) + writeCurrentSnapshotManifests(t, fs, setup, meta, [][]iceberg.ManifestEntry{ + makeManifestEntries(t, []testEntrySpec{ + {path: "data/small-1.parquet", size: 1024, partition: map[int]any{}}, + {path: "data/small-2.parquet", size: 1024, partition: map[int]any{}}, + }, 1), + }) + + handler := NewHandler(nil) + compactConfig := Config{ + TargetFileSizeBytes: 4096, + MinInputFiles: 2, + Operations: "compact", + } + if _, err := handler.scanTablesForMaintenance(context.Background(), client, compactConfig, "", "", "", 0); err != nil { + t.Fatalf("compact scanTablesForMaintenance failed: %v", err) + } + + rewriteConfig := Config{ + MinManifestsToRewrite: 5, + Operations: "rewrite_manifests", + } + if _, err := handler.scanTablesForMaintenance(context.Background(), client, rewriteConfig, "", "", "", 0); err != nil { + t.Fatalf("rewrite scanTablesForMaintenance failed: %v", err) + } + + tableDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.Namespace) + tableEntry := fs.getEntry(tableDir, setup.TableName) + if tableEntry == nil { + t.Fatal("table entry not found") + } + + var envelope struct { + PlanningIndex *planningIndex `json:"planningIndex,omitempty"` + } + if err := json.Unmarshal(tableEntry.Extended[s3tables.ExtendedKeyMetadata], &envelope); err != nil { + t.Fatalf("parse table metadata xattr: %v", err) + } + if envelope.PlanningIndex == nil { + t.Fatal("expected persisted planning index") + } + if envelope.PlanningIndex.Compaction == nil { + t.Fatal("expected compaction section to be preserved") + } + if envelope.PlanningIndex.RewriteManifests == nil { + t.Fatal("expected rewrite_manifests section to be added") + } +} + +func TestTableNeedsMaintenanceCachesPlanningIndexBuildError(t *testing.T) { + fs, client := startFakeFiler(t) + + now := time.Now().UnixMilli() + setup := tableSetup{ + BucketName: "test-bucket", + Namespace: "analytics", + TableName: "events", + Snapshots: []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro", SequenceNumber: 1}, + }, + } + meta := populateTable(t, fs, setup) + writeCurrentSnapshotManifests(t, fs, setup, meta, [][]iceberg.ManifestEntry{ + makeManifestEntries(t, []testEntrySpec{ + {path: "data/small-1.parquet", size: 1024, partition: map[int]any{}}, + {path: "data/small-2.parquet", size: 1024, partition: map[int]any{}}, + }, 1), + }) + + metaDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath(), "metadata") + fs.putEntry(metaDir, "snap-1.avro", &filer_pb.Entry{ + Name: "snap-1.avro", + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + FileSize: uint64(len("broken")), + }, + Content: []byte("broken"), + }) + + handler := NewHandler(nil) + config := Config{ + TargetFileSizeBytes: 4096, + MinInputFiles: 2, + MinManifestsToRewrite: 2, + Operations: "compact,rewrite_manifests", + } + ops, err := parseOperations(config.Operations) + if err != nil { + t.Fatalf("parseOperations: %v", err) + } + + needsWork, err := handler.tableNeedsMaintenance(context.Background(), client, setup.BucketName, setup.tablePath(), meta, "v1.metadata.json", nil, config, ops) + if err == nil { + t.Fatal("expected planning-index build error") + } + if needsWork { + t.Fatal("expected no maintenance result on planning-index build error") + } + if strings.Count(err.Error(), "parse manifest list") != 1 { + t.Fatalf("expected planning-index build error to be reported once, got %q", err) + } +} + +func TestTableNeedsMaintenanceScopesPlanningIndexBuildErrorsPerOperation(t *testing.T) { + fs, client := startFakeFiler(t) + + now := time.Now().UnixMilli() + setup := tableSetup{ + BucketName: "test-bucket", + Namespace: "analytics", + TableName: "events", + Snapshots: []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro", SequenceNumber: 1}, + }, + } + meta := populateTable(t, fs, setup) + writeCurrentSnapshotManifests(t, fs, setup, meta, [][]iceberg.ManifestEntry{ + makeManifestEntries(t, []testEntrySpec{ + {path: "data/small-1.parquet", size: 1024, partition: map[int]any{}}, + }, 1), + makeManifestEntries(t, []testEntrySpec{ + {path: "data/small-2.parquet", size: 1024, partition: map[int]any{}}, + }, 1), + }) + + metaDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath(), "metadata") + fs.putEntry(metaDir, "detect-manifest-1.avro", &filer_pb.Entry{ + Name: "detect-manifest-1.avro", + Attributes: &filer_pb.FuseAttributes{ + Mtime: time.Now().Unix(), + FileSize: uint64(len("broken")), + }, + Content: []byte("broken"), + }) + + handler := NewHandler(nil) + config := Config{ + TargetFileSizeBytes: 4096, + MinInputFiles: 2, + MinManifestsToRewrite: 2, + Operations: "compact,rewrite_manifests", + } + ops, err := parseOperations(config.Operations) + if err != nil { + t.Fatalf("parseOperations: %v", err) + } + + needsWork, err := handler.tableNeedsMaintenance(context.Background(), client, setup.BucketName, setup.tablePath(), meta, "v1.metadata.json", nil, config, ops) + if err != nil { + t.Fatalf("expected rewrite_manifests planning to survive compaction planning error, got %v", err) + } + if !needsWork { + t.Fatal("expected rewrite_manifests maintenance despite compaction planning error") + } +} + +func TestPersistPlanningIndexUsesMetadataXattrCASGuard(t *testing.T) { + fs, client := startFakeFiler(t) + + now := time.Now().UnixMilli() + setup := tableSetup{ + BucketName: "test-bucket", + Namespace: "analytics", + TableName: "events", + Snapshots: []table.Snapshot{ + {SnapshotID: 1, TimestampMs: now, ManifestList: "metadata/snap-1.avro", SequenceNumber: 1}, + }, + } + populateTable(t, fs, setup) + + tableDir := path.Join(s3tables.TablesPath, setup.BucketName, setup.tablePath()) + tableEntry := fs.getEntry(path.Dir(tableDir), path.Base(tableDir)) + if tableEntry == nil { + t.Fatal("table entry not found") + } + + observedExpectedExtended := make(chan map[string][]byte, 1) + fs.beforeUpdate = func(_ *fakeFilerServer, req *filer_pb.UpdateEntryRequest) error { + cloned := make(map[string][]byte, len(req.ExpectedExtended)) + for key, value := range req.ExpectedExtended { + cloned[key] = append([]byte(nil), value...) + } + observedExpectedExtended <- cloned + return nil + } + + err := persistPlanningIndex(context.Background(), client, setup.BucketName, setup.tablePath(), &planningIndex{ + SnapshotID: 1, + ManifestList: "metadata/snap-1.avro", + UpdatedAtMs: time.Now().UnixMilli(), + }) + if err != nil { + t.Fatalf("persistPlanningIndex: %v", err) + } + + var expectedExtended map[string][]byte + select { + case expectedExtended = <-observedExpectedExtended: + default: + t.Fatal("expected persistPlanningIndex to issue an UpdateEntry request") + } + + if got := expectedExtended[s3tables.ExtendedKeyMetadata]; !bytes.Equal(got, tableEntry.Extended[s3tables.ExtendedKeyMetadata]) { + t.Fatal("expected metadata xattr to be included in ExpectedExtended") + } + if got := expectedExtended[s3tables.ExtendedKeyMetadataVersion]; !bytes.Equal(got, tableEntry.Extended[s3tables.ExtendedKeyMetadataVersion]) { + t.Fatal("expected metadata version xattr to be preserved in ExpectedExtended") + } +} + func TestDetectDoesNotScheduleManifestRewriteFromDeleteManifestsOnly(t *testing.T) { fs, client := startFakeFiler(t) diff --git a/weed/plugin/worker/iceberg/planning_index.go b/weed/plugin/worker/iceberg/planning_index.go new file mode 100644 index 000000000..43cc44a48 --- /dev/null +++ b/weed/plugin/worker/iceberg/planning_index.go @@ -0,0 +1,269 @@ +package iceberg + +import ( + "context" + "encoding/json" + "fmt" + "path" + "time" + + "github.com/apache/iceberg-go" + "github.com/apache/iceberg-go/table" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3tables" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type planningIndex struct { + SnapshotID int64 `json:"snapshotId"` + ManifestList string `json:"manifestList,omitempty"` + UpdatedAtMs int64 `json:"updatedAtMs"` + DataManifestCount int64 `json:"dataManifestCount,omitempty"` + Compaction *planningIndexCompaction `json:"compaction,omitempty"` + RewriteManifests *planningIndexRewriteManifests `json:"rewriteManifests,omitempty"` +} + +type planningIndexCompaction struct { + ConfigHash string `json:"configHash"` + Eligible bool `json:"eligible"` +} + +type planningIndexRewriteManifests struct { + Threshold int64 `json:"threshold"` + Eligible bool `json:"eligible"` +} + +type tableMetadataEnvelope struct { + MetadataVersion int `json:"metadataVersion"` + MetadataLocation string `json:"metadataLocation,omitempty"` + Metadata *struct { + FullMetadata json.RawMessage `json:"fullMetadata,omitempty"` + } `json:"metadata,omitempty"` + PlanningIndex json.RawMessage `json:"planningIndex,omitempty"` +} + +func parseTableMetadataEnvelope(metadataBytes []byte) (table.Metadata, string, *planningIndex, error) { + var envelope tableMetadataEnvelope + if err := json.Unmarshal(metadataBytes, &envelope); err != nil { + return nil, "", nil, fmt.Errorf("parse metadata xattr: %w", err) + } + if envelope.Metadata == nil || len(envelope.Metadata.FullMetadata) == 0 { + return nil, "", nil, fmt.Errorf("no fullMetadata in table xattr") + } + + meta, err := table.ParseMetadataBytes(envelope.Metadata.FullMetadata) + if err != nil { + return nil, "", nil, fmt.Errorf("parse iceberg metadata: %w", err) + } + + var index *planningIndex + if len(envelope.PlanningIndex) > 0 { + if err := json.Unmarshal(envelope.PlanningIndex, &index); err != nil { + glog.V(2).Infof("iceberg maintenance: ignoring invalid planning index cache: %v", err) + index = nil + } + } + + metadataFileName := metadataFileNameFromLocation(envelope.MetadataLocation, "", "") + if metadataFileName == "" { + metadataFileName = fmt.Sprintf("v%d.metadata.json", envelope.MetadataVersion) + } + return meta, metadataFileName, index, nil +} + +func (idx *planningIndex) matchesSnapshot(meta table.Metadata) bool { + if idx == nil { + return false + } + currentSnap := meta.CurrentSnapshot() + if currentSnap == nil || currentSnap.ManifestList == "" { + return false + } + return idx.SnapshotID == currentSnap.SnapshotID && idx.ManifestList == currentSnap.ManifestList +} + +func (idx *planningIndex) compactionEligible(config Config) (bool, bool) { + if idx == nil || idx.Compaction == nil { + return false, false + } + if idx.Compaction.ConfigHash != compactionPlanningConfigHash(config) { + return false, false + } + return idx.Compaction.Eligible, true +} + +func (idx *planningIndex) rewriteManifestsEligible(config Config) (bool, bool) { + if idx == nil || idx.RewriteManifests == nil { + return false, false + } + if idx.RewriteManifests.Threshold != config.MinManifestsToRewrite { + return false, false + } + return idx.RewriteManifests.Eligible, true +} + +func compactionPlanningConfigHash(config Config) string { + return fmt.Sprintf("target=%d|min=%d", config.TargetFileSizeBytes, config.MinInputFiles) +} + +func operationRequested(ops []string, wanted string) bool { + for _, op := range ops { + if op == wanted { + return true + } + } + return false +} + +func mergePlanningIndexSections(index, existing *planningIndex) *planningIndex { + if index == nil || existing == nil { + return index + } + if index.SnapshotID != existing.SnapshotID || index.ManifestList != existing.ManifestList { + return index + } + if index.Compaction == nil && existing.Compaction != nil { + compactionCopy := *existing.Compaction + index.Compaction = &compactionCopy + } + if index.RewriteManifests == nil && existing.RewriteManifests != nil { + rewriteCopy := *existing.RewriteManifests + index.RewriteManifests = &rewriteCopy + } + return index +} + +func buildPlanningIndex( + ctx context.Context, + filerClient filer_pb.SeaweedFilerClient, + bucketName, tablePath string, + meta table.Metadata, + config Config, + ops []string, +) (*planningIndex, error) { + currentSnap := meta.CurrentSnapshot() + if currentSnap == nil || currentSnap.ManifestList == "" { + return nil, nil + } + + manifests, err := loadCurrentManifests(ctx, filerClient, bucketName, tablePath, meta) + if err != nil { + return nil, err + } + return buildPlanningIndexFromManifests(ctx, filerClient, bucketName, tablePath, meta, config, ops, manifests) +} + +func buildPlanningIndexFromManifests( + ctx context.Context, + filerClient filer_pb.SeaweedFilerClient, + bucketName, tablePath string, + meta table.Metadata, + config Config, + ops []string, + manifests []iceberg.ManifestFile, +) (*planningIndex, error) { + currentSnap := meta.CurrentSnapshot() + if currentSnap == nil || currentSnap.ManifestList == "" { + return nil, nil + } + + index := &planningIndex{ + SnapshotID: currentSnap.SnapshotID, + ManifestList: currentSnap.ManifestList, + UpdatedAtMs: time.Now().UnixMilli(), + DataManifestCount: countDataManifests(manifests), + } + + if operationRequested(ops, "compact") { + eligible, err := hasEligibleCompaction(ctx, filerClient, bucketName, tablePath, manifests, config) + if err != nil { + return nil, err + } + index.Compaction = &planningIndexCompaction{ + ConfigHash: compactionPlanningConfigHash(config), + Eligible: eligible, + } + } + + if operationRequested(ops, "rewrite_manifests") { + index.RewriteManifests = &planningIndexRewriteManifests{ + Threshold: config.MinManifestsToRewrite, + Eligible: index.DataManifestCount >= config.MinManifestsToRewrite, + } + } + + return index, nil +} + +func persistPlanningIndex( + ctx context.Context, + client filer_pb.SeaweedFilerClient, + bucketName, tablePath string, + index *planningIndex, +) error { + if index == nil { + return nil + } + + tableDir := path.Join(s3tables.TablesPath, bucketName, tablePath) + tableName := path.Base(tableDir) + parentDir := path.Dir(tableDir) + + resp, err := filer_pb.LookupEntry(ctx, client, &filer_pb.LookupDirectoryEntryRequest{ + Directory: parentDir, + Name: tableName, + }) + if err != nil { + return fmt.Errorf("lookup table entry: %w", err) + } + if resp == nil || resp.Entry == nil { + return fmt.Errorf("table entry not found") + } + + existingXattr, ok := resp.Entry.Extended[s3tables.ExtendedKeyMetadata] + if !ok || len(existingXattr) == 0 { + return fmt.Errorf("no metadata xattr on table entry") + } + + var internalMeta map[string]json.RawMessage + if err := json.Unmarshal(existingXattr, &internalMeta); err != nil { + return fmt.Errorf("unmarshal metadata xattr: %w", err) + } + if _, _, existingIndex, err := parseTableMetadataEnvelope(existingXattr); err == nil { + index = mergePlanningIndexSections(index, existingIndex) + } + + indexJSON, err := json.Marshal(index) + if err != nil { + return fmt.Errorf("marshal planning index: %w", err) + } + internalMeta["planningIndex"] = indexJSON + + updatedXattr, err := json.Marshal(internalMeta) + if err != nil { + return fmt.Errorf("marshal updated metadata xattr: %w", err) + } + + expectedExtended := map[string][]byte{ + s3tables.ExtendedKeyMetadata: existingXattr, + } + if expectedVersionXattr, ok := resp.Entry.Extended[s3tables.ExtendedKeyMetadataVersion]; ok && len(expectedVersionXattr) > 0 { + expectedExtended[s3tables.ExtendedKeyMetadataVersion] = expectedVersionXattr + } + resp.Entry.Extended[s3tables.ExtendedKeyMetadata] = updatedXattr + _, err = client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{ + Directory: parentDir, + Entry: resp.Entry, + ExpectedExtended: expectedExtended, + }) + if err != nil { + if status.Code(err) == codes.FailedPrecondition { + return nil + } + return fmt.Errorf("update table entry: %w", err) + } + + return nil +}